博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java 并发
阅读量:5734 次
发布时间:2019-06-18

本文共 15029 字,大约阅读时间需要 50 分钟。

hot3.png

Java并发

     并发编程可以是程序执行速度得到极大提高,或者为设计某些类型的程序提供更易用的模型,或者两者皆有。

1 并发的多面性

     用并发解决问题大体上可以分为“速度”和“设计可管理性”两种。

    1.1 更快的执行

        可能在多处理器上使用多线程技术我们更容易理解,但是,并发通常是提高运行在单处理器上的程序的性能,这听起来可能有违直觉。在单处理器上顺序执行在开销上确实更小,但是如果碰到一些操作(如I/O等)会造成阻塞,那么后续的任务就不能继续执行了,整个程序都将停下来,而使用并发编程就很好的解决了这个问题。(当然,如果没有阻塞任务,在单处理器上使用并发就毫无意义了)

    1.2 改进代码设计

         并发可以极大的简化程序设计。

2 基本的线程机制

    并发编程使我们可以将程序划分为多个分离的,独立运行的任务。通过使用多线程机制,这些独立应用(子线程)中的每一个都将由执行线程来驱动。一个线程就是在进程中的一个单一的顺序控制流,所以单个进程可以拥有多个并发执行的任务,程序使得每个任务都好像拥有自己的CPU一样,底层机制就是切分CPU时间,我们通常都不需要考虑这个。

    多任务和多线程往往是使用多处理器系统的最合理方式。

    2.1 定义任务(Runnable接口)

        (此处先说明一下:可能不了解java并发的,一开始看不明白,其实主要是任务和线程的一些概念不清楚,请继续向下看,会一步步解释的)

        要想定义任务,只需要实现Runnable接口并编写run()方法即可。

 
public class SimpleI implements Runnable {
protected int countMax = 10;//倒计时 private static int taskCount = 0; private final int id = taskCount++;//区分任务的多个实例 public String status(){
return "#" + id + "(" + (countMax>0?countMax:"over") + "),"; } @Override public void run() {
while(countMax-- > 0) System.out.print(status()); Thread.yield();//对线程调度器的一种建议:“我已经完成对生命周期最重要的部分,是时候切换了” } public static void main(String args[]){
new SimpleI().run(); new SimpleI().run(); } }
 
输出:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),

         这个任务并不是由单独的线程驱动的(其实也是由线程驱动,不过这个线程是main所在的那个线程),如果要实现真正的线程能力,需要显示的将一个任务附加到线程上。

    2.2 Thread类

         将Runnable对象提交给Thread构造器即可转变为一个工作任务:

 
public static void main(String args[]){
Thread t = new Thread(new SimpleI()); t.start(); System.out.println("flag---:"); }//输出:flag---:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),

         此时,main()线程中其他操作与new SimpleI().run()是“同时”执行的。

 
public static void main(String args[]){
int i = 5; while(i-->0) new Thread(new SimpleI()).start(); System.out.print("flag---:"); }/*输出: flag---:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(9),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(9),#3(8),#3(7),#3(6),#3(5),#3(4),#3(3),#3(2),#3(1),#3(over),#4(9),#4(8),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),*/

        当然,我们也可以直接继承Thread类,然后调用.start()方法,但是Runnable是一个接口,如果我们直接继承Therad的话,就没办法继承其他类了,还有就是Runnable更适合资源的共享。以下为其他写法:

 RunnableSelf.java:

 
public class RunnableSelf implements Runnable {
public RunnableSelf() {
new Thread(this).start(); } @Override public void run() {} }

ThreadSelf.java:

 
public class ThreadSelf extends Thread{
public ThreadSelf() {
start(); } @Override public void run() {} }

        如果相对简单,这可能是安全的,但是,如果另一个任务在构造器完成之前开始执行,这将引发问题,所以使用Excutor才是更优的选择(使用内部类也是一个很有用的方法,可以将代码隐藏在类内部)。

        在Java中,Thread类自身并不执行任何操作,只是驱动赋予它的任务。Runnable意为可猎取的,在此处这个借口名字选择并不是很好,或许使用Task就好多了。

      (注意:new的Thread对象是没法被回收的,可线程池对此作优化)

    2.3 Executor(执行器)

        执行器Executor可以管理Thread对象,Java提供4中不同的线程池。

        CachedThreadPool:将为每个任务都创建一个线程。一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
        FixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
        SingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
        ScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。

 
public static ExecutorService newFixedThreadPool(int nThreads) //创建固定数目线程的线程池。 public static ExecutorService newCachedThreadPool() //创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。 public static ExecutorService newSingleThreadExecutor() //创建一个单线程化的Executor。 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) //创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
 
public static void cachedThread(){
ExecutorService exec = Executors.newCachedThreadPool(); int i = 5; while(i-- > 0) exec.execute(new SimpleI()); exec.shutdown(); System.out.println("cachedThread:"); } public static void fixedThread(){
ExecutorService exec = Executors.newFixedThreadPool(5); int i = 5; while(i-- > 0) exec.execute(new SimpleI()); exec.shutdown(); System.out.println("fixedThread:"); } public static void singleThread(){
ExecutorService exec = Executors.newSingleThreadExecutor(); int i = 5; while(i-- > 0) exec.execute(new SimpleI()); exec.shutdown(); System.out.println("singleThread:"); }

        调用后分别有如下输出:

 
cachedThread: #1(9),#3(9),#2(9),#0(9),#4(9),#4(8),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),#2(8),#2(7),#3(8),#1(8),#3(7),#2(6),#3(6),#1(7),#3(5),#2(5),#3(4),#1(6),#3(3),#2(4),#3(2),#1(5),#3(1),#2(3),#3(over),#1(4),#1(3),#1(2),#2(2),#1(1),#2(1),#1(over),#2(over), fixedThread: #4(9),#3(9),#1(9),#2(9),#1(8),#0(9),#1(7),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(8),#3(7),#4(8),#3(6),#4(7),#3(5),#4(6),#3(4),#4(5),#3(3),#4(4),#3(2),#4(3),#3(1),#4(2),#3(over),#4(1),#4(over), singleThread: #0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(9),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(9),#3(8),#3(7),#3(6),#3(5),#3(4),#3(3),#3(2),#3(1),#3(over),#4(9),#4(8),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),

        在任何线程池中,任何现有的线程,在可能的情况下,都会被自动复用。

        通常,CachedThread是合理的Executor的首选,它会创建与所需数量相当的线程,然后再回首线程时停止创建新线程。如果这种方式引发了问题,就得考虑切换到FixedChread了。

    2.4 线程池生命周期

         ExecutorService扩展了Executor并添加了一些生命周期管理的方法。

        一个Executor的生命周期有三种状态,运行 ,关闭 ,终止 。
        Executor创建时处于运行状态。
        当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,
        所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。 如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。

    2.5 从任务中返回结果

         Runnable是执行工作的独立任务,但是不返回任何值。如果需要返回值,那么就实现Callable接口而不是Runnable。Callable(Java SE5中引入的)是一种具有类型参数的泛型,类型参数表示的是从call()中返回的值,并且必须使用ExecutorService.submit()调用:

 
public class CallableDemo implements Callable<String>{
static int id = 0; @Override public String call() throws Exception {
return System.currentTimeMillis() + ":" + id++ +","; } public static void main(String args[]) throws InterruptedException, ExecutionException{
ExecutorService service = Executors.newCachedThreadPool(); int i = 5; List<Future<String>> result = new ArrayList<>(); while(i-- > 0) result.add(service.submit(new CallableDemo())); System.out.println("CallableDemo:" + System.currentTimeMillis()); for(Future<String> f : result) System.out.print(f.get()); } }/*输出:CallableDemo:1448935006857 1448935006857:1,1448935006857:0,1448935006859:2,1448935006859:3,1448935006859:4,*/

         submit()方法会产生Future对象。可以调用isDone()方法查询Future是否已经完成。使用get()方法获取结果,使用get()会阻塞直到结果准备就绪。

    2.6 休眠

         可以直接调用sleep()将任务终止给定的时间,可以抛出InterruptedException异常,但是异常不能跨线程传播,所以必须在任务内处理,但这已经Old了,Java SE5引入了显示的sleep()版本,作为TimeUnit类的一部分:

 
// Thread.sleep(100); //old style // TimeUnit.MICROSECONDS.sleep(100); //Java SE5/6

 

   2.7 优先级

        线程的优先级会将它的重要性传递给调度器,使得优先级越高的,将得到更高的执行频率。

        使用getPriority()和(任何时候)setPriority()读取和设置现有线程的优先级:

 
public class SimplePriorities implements Runnable {
int priority; double d = 0.0; public SimplePriorities(int priority) {
this.priority = priority; } @Override public void run() {
Thread.currentThread().setPriority(priority); for(int i = 1;i<100000000;i++) d += (Math.PI + Math.E)/(double)i*Math.PI; System.out.print("priority:" + priority + ","); Thread.yield();//让步:工作得差不多了,可以让别的线程使用了O(∩_∩)O~ } public static void main(String args[]){
ExecutorService executorService = Executors.newCachedThreadPool(); int i = 5; while(i-->0) executorService.execute(new SimplePriorities(Thread.MIN_PRIORITY)); i = 5; while(i-->0) executorService.execute(new SimplePriorities(Thread.NORM_PRIORITY)); executorService.execute(new SimplePriorities(Thread.MAX_PRIORITY)); System.out.println("SimplePriorities:"); executorService.shutdown(); } }/*SimplePriorities: priority:10,priority:5,priority:5,priority:5,priority:5,priority:5,priority:1,priority:1,priority:1,priority:1,priority:1,*/

         这里,在线程中做了大量的浮点数运算,以保证线程能够消耗一定的资源,当然不同机器效果不一定相同,如果cpu能力一般,可将循环次数减少。

        虽然JDK提供了10个优先级,但是考虑到各大操作系统的差异性,映射关系并不稳定,所以通常只使用给定的3中级别。

     2.8 join()

        一个线程可以在其他线程之上调用join()方法:在被调用线程结束后继续执行,还可以带超时参数,中断join()方法的调用是在调用线程上调用interrupt()方法(结合try)。

 JoinDemo.java:

 
public class JoinDemo {
public static void main(String args[]){
Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500); new Joiner("Dopey", sleepy); new Joiner("Doc", grumpy); grumpy.interrupt(); } } class Sleeper extends Thread{
int duration; public Sleeper(String name,int time) {
super(name); duration = time; start(); } @Override public void run() {
try {
sleep(duration); } catch (InterruptedException e) {
System.out.println(getName() + " was interrupted. " + "isInterrupted():" + isInterrupted()); //被中断 return; } System.out.println(getName() + " has awakened");//被唤醒 } } class Joiner extends Thread{
Sleeper sleeper; public Joiner(String name,Sleeper sleeper) {
super(name); this.sleeper = sleeper; start(); } @Override public void run() {
try {
sleeper.join(); } catch (InterruptedException e) {
System.out.println("Interrupted"); } System.out.println(getName() + " join completed"); } }/*输出:Grumpy was interrupted. isInterrupted():false Doc join completed Sleepy has awakened Dopey join completed*/

        Sleeper是一个Thread类型,在run中,sleep()可能会在指定的时间满后返回,也可能被中断。在catch里面,将根据isInterrupted()的返回值报告这个中断,当另一个线程在该线程上调用interrupt()时,将给该线程设定一个标志,表明该线程已经被中断了。异常被捕获时将清理这个标志,所以异常被捕获时,标志总是为假。

        Joiner线程将通过在Sleeper对象上调用join()来等待Sleeper醒来,从输出可以发现,Sleeper被中断或者正常结束,Joiner和Sleeper将一起结束(需看详细过程,可在Joiner中sleep一定时间并打印时附加System.currentTimeMillis()即可)。

     2.9 捕获异常

        由于线程的本质特征,我们是不能从线程中捕获逃逸的异常的。异常逃出任务的run方法后,就会向外传播到控制台:

 
public class ExceptionThread implements Runnable {
@Override public void run() {
throw new RuntimeException("anxpp.com"); } public static void main(String args[]){
try{
Executors.newCachedThreadPool().execute(new ExceptionThread()); }catch(RuntimeException e){} } }/*输出: Exception in thread "pool-1-thread-1" java.lang.RuntimeException: anxpp.com at com.anxpp.concurrent.ExceptionThread.run(ExceptionThread.java:8) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)*/

        可以看到,即使在main中使用了try语句也不能捕获到异常。

        (Java SE5之前,可以通过线程组来捕获)现在,我们可以使用Excutor来捕获这些异常。为了解决问题,我们需要修改Executor产生线程的方式。Thread.UncaughtExceptionHandler(Java SE5中的新接口)允许在每个线程上都附着一个异常处理器,Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用:

 
public class CaptureThreadException {
public static void main(String args[]){
ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory()); service.execute(new ExceptionThreadN()); service.shutdown(); System.out.println("CaptureThreadException:"); } } class ExceptionThreadN implements Runnable{
@Override public void run() {
Thread thread = Thread.currentThread(); System.out.println("run() by " + thread.getName()); System.out.println("eh=" + thread.getUncaughtExceptionHandler()); throw new RuntimeException("anxpp.com_exception"); } } class MyUncaughtExceptionHandler implements UncaughtExceptionHandler{
@Override public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught:" + e); } } class HandlerThreadFactory implements ThreadFactory{
@Override public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread"); Thread t = new Thread(r); t.setName("T"); System.out.println("created:" + t.getName()); t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); System.out.println("eh=" + t.getUncaughtExceptionHandler()); return t; } }/*输出: com.anxpp.concurrent.HandlerThreadFactory@5abe753a creating new Thread created:T eh=com.anxpp.concurrent.MyUncaughtExceptionHandler@5e9a94 run() by T eh=com.anxpp.concurrent.MyUncaughtExceptionHandler@5e9a94 CaptureThreadException: caught:java.lang.RuntimeException: anxpp.com_exception*/

         可以看到,未捕获的异常是通过uncaughtException来捕获的。如果需要在各处使用相同的异常处理器,可设置一个静态域,并将处理器设置为默认的。

 3 (受限)资源共享

     有了并发就可以同时做多件事情,但是,两个和多个线程彼此互相干涉的问题就出现了。

    3.1 引出问题

        首先,编写一个可以产生偶数的抽象类:

 
public abstract class IntGenerator {
protected int value = 0; private volatile boolean canceled = false; public abstract int next(); public void cancel(){
canceled = true; } public boolean isCanceled(){
return canceled; } }

         然后编写一个测试类(可测试所有继承自上面这个类的子类),并实现抽象类后测试:

 
public class Check implements Runnable {
IntGenerator g; final int id; public Check(IntGenerator g,int id) {
this.id = id; this.g = g; } @Override public void run() {
while(!g.isCanceled()){
int val = g.next(); if(val % 2 != 0){
System.out.println("not pass:" + val); g.cancel(); } } } public static void test(IntGenerator g,int count){
ExecutorService service = Executors.newCachedThreadPool(); for(int i = 0;i < count; i++) service.execute(new Check(g, i)); service.shutdown(); } public static void test(IntGenerator g){
test(g,20); }//default count public static void main(String args[]){
System.out.println("begin:"); test(new IntGenerator() {
@Override public int next() {
value++;危险,此处放置yield会更快出现问题! value++; return value; } },10); } }/*输出 begin: not pass:575 not pass:579 not pass:577 not pass:581*/ 

         一个任务可能会在另一个任务执行2次自增之间调用next()方法,造成程序最终运行失败。再者,Java中的自增并不是原子操作,在自增过程中任务也可能会被线程机制挂起!所以,即使是单一递增,不保护任务也不是安全的。

 

    3.2 解决共享资源竞争问题

        我们在使用多线程的时候,总是要多加谨慎,就像我们打LOL的时候,引导技能正准备最后一击带他升天的时候,敌人躺下了,因为你的线程挂起了,另一个玩家击杀了他。那么解决之道就是当资源被一个任务使用的时候,在其上加锁。基本所有的并发模式在解决线程冲突的时候,都是采用序列化访问共享资源的方案。在代码前加的锁语句产生了一种相互排斥的效果,就是常常被称为互斥量的机制。

        比如:大家都想单独使用卫生间(排除一些爱好比较广泛的人),为了使用,就先敲门,如果没人就进入并锁上门后使用,其他的人就不能进来只能等待直到可以使用。而且,人们也并没有排队,谁都可能是下一个敲门的人。可以通过yield()和setPriority()给线程调度器提供建议(虽然不一定有效,具体效果取决于具体平台和JVM的实现)。
        Java通过提供关键字 synchronized 的形式,提供内置支持。

        共享资源一般是以对象形式存在,但也可能是文件,其他I/O或打印机。要控制对共享资源的访问,得先将其包装进一个对象。然后把访问这个对象的方法标记为 synchronized :

 
synchronized void f(){
/* ... */}

        (同步控制)改造以上方法:

 
public static void main(String args[]){
System.out.println("begin:"); test(new IntGenerator() {
@Override public synchronized int next() {
value++; Thread.yield();//问了增加value为奇数时上下文切换的可能性 value++; if(value%20000000 == 0) System.out.println(value); return value; } },100); }/*输出: begin: 20000000 40000000 60000000 ...一直都不会出问题*/

        任何时候,都只有一个任务可以通过由互斥量看护的代码。

        使用显示的Lock对象

 
public static void main(String args[]){
System.out.println("begin:"); test(new IntGenerator() {
private Lock lock = new ReentrantLock(); @Override public int next() {
lock.lock(); try{
value++; Thread.yield(); value++; if(value%20000000 == 0) System.out.println(value); return value;//必须在try语句里,不然unlock()会过早发生! }finally{
lock.unlock(); } } },100); }

        显示的锁代码会不那么优雅,但是更加灵活。通常我们可以使用更简洁的方式,但在解决特殊问题时(如处理异常、清理工作等),就要使用显示的Lock了。

    3.3 原子性与易变量

        由于Java的跨平台特性,诸如自增等不像C++(通常情况下)一样是原子操作(即这个操作过程是不能中断的),一旦操作开始,它一定可以在可能发生的“上下文切换”之前执行完毕。所以在Java上我们应该避免使用原子性来代替同步。如果一定要玩火,请接受下面的测试:Goetz测试(以为并发专家命名的测试):如果你可以编写用于现代微处理器的高性能JVM,那么久有资格去考虑是否可以避免同步!

        易变性(当时在单片机上做c开发的时候就吃过亏),volatile意为易失的。本人也没做深入了解,就不说多了,毕竟也很少用,但有以下原子类,在设计性能调优时挺有用的,他们可以在机器级别上获得原子性:

        AtomicInteger,AtomicLong,AtomicReference等,具体如何操作请度娘,这里将改写上面的方法一边丢掉synchronized:

 
public static void main(String args[]){
test(new IntGenerator() {
@Override public int next() {
return i.addAndGet(2); } },100); }/*这里消除了synchronized关键字,但程序不会失败!*/

     3.4 临界区(critical section)

        只希望防止多个线程访问方法内部分代码(同步控制块),可以用如下方式:

 
synchronized(syncObject/*指定对象*/){
//code here 对花括号内代码进行同步控制 }

         使用同步控制块而不是对整个方法进行同步控制可以显著提高性能!

    3.5 在其他对象上同步

    3.6 线程本地存储

4 终结任务

---未完待续

转载于:https://my.oschina.net/lvzunwei/blog/687832

你可能感兴趣的文章