Java并发
并发编程可以是程序执行速度得到极大提高,或者为设计某些类型的程序提供更易用的模型,或者两者皆有。
1 并发的多面性
用并发解决问题大体上可以分为“速度”和“设计可管理性”两种。
1.1 更快的执行
可能在多处理器上使用多线程技术我们更容易理解,但是,并发通常是提高运行在单处理器上的程序的性能,这听起来可能有违直觉。在单处理器上顺序执行在开销上确实更小,但是如果碰到一些操作(如I/O等)会造成阻塞,那么后续的任务就不能继续执行了,整个程序都将停下来,而使用并发编程就很好的解决了这个问题。(当然,如果没有阻塞任务,在单处理器上使用并发就毫无意义了)
1.2 改进代码设计
并发可以极大的简化程序设计。
2 基本的线程机制
并发编程使我们可以将程序划分为多个分离的,独立运行的任务。通过使用多线程机制,这些独立应用(子线程)中的每一个都将由执行线程来驱动。一个线程就是在进程中的一个单一的顺序控制流,所以单个进程可以拥有多个并发执行的任务,程序使得每个任务都好像拥有自己的CPU一样,底层机制就是切分CPU时间,我们通常都不需要考虑这个。 多任务和多线程往往是使用多处理器系统的最合理方式。
2.1 定义任务(Runnable接口)
(此处先说明一下:可能不了解java并发的,一开始看不明白,其实主要是任务和线程的一些概念不清楚,请继续向下看,会一步步解释的)
要想定义任务,只需要实现Runnable接口并编写run()方法即可。
public class SimpleI implements Runnable { protected int countMax = 10;//倒计时 private static int taskCount = 0; private final int id = taskCount++;//区分任务的多个实例 public String status(){ return "#" + id + "(" + (countMax>0?countMax:"over") + "),"; } @Override public void run() { while(countMax-- > 0) System.out.print(status()); Thread.yield();//对线程调度器的一种建议:“我已经完成对生命周期最重要的部分,是时候切换了” } public static void main(String args[]){ new SimpleI().run(); new SimpleI().run(); } }
输出:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),
这个任务并不是由单独的线程驱动的(其实也是由线程驱动,不过这个线程是main所在的那个线程),如果要实现真正的线程能力,需要显示的将一个任务附加到线程上。
2.2 Thread类
将Runnable对象提交给Thread构造器即可转变为一个工作任务:
public static void main(String args[]){ Thread t = new Thread(new SimpleI()); t.start(); System.out.println("flag---:"); }//输出:flag---:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),
此时,main()线程中其他操作与new SimpleI().run()是“同时”执行的。
public static void main(String args[]){ int i = 5; while(i-->0) new Thread(new SimpleI()).start(); System.out.print("flag---:"); }/*输出: flag---:#0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(9),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(9),#3(8),#3(7),#3(6),#3(5),#3(4),#3(3),#3(2),#3(1),#3(over),#4(9),#4(8),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),*/
当然,我们也可以直接继承Thread类,然后调用.start()方法,但是Runnable是一个接口,如果我们直接继承Therad的话,就没办法继承其他类了,还有就是Runnable更适合资源的共享。以下为其他写法:
RunnableSelf.java:
public class RunnableSelf implements Runnable { public RunnableSelf() { new Thread(this).start(); } @Override public void run() {} }
ThreadSelf.java:
public class ThreadSelf extends Thread{ public ThreadSelf() { start(); } @Override public void run() {} }
如果相对简单,这可能是安全的,但是,如果另一个任务在构造器完成之前开始执行,这将引发问题,所以使用Excutor才是更优的选择(使用内部类也是一个很有用的方法,可以将代码隐藏在类内部)。
在Java中,Thread类自身并不执行任何操作,只是驱动赋予它的任务。Runnable意为可猎取的,在此处这个借口名字选择并不是很好,或许使用Task就好多了。
(注意:new的Thread对象是没法被回收的,可线程池对此作优化)
2.3 Executor(执行器)
执行器Executor可以管理Thread对象,Java提供4中不同的线程池。 CachedThreadPool:将为每个任务都创建一个线程。一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 FixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 SingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 ScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。
public static ExecutorService newFixedThreadPool(int nThreads) //创建固定数目线程的线程池。 public static ExecutorService newCachedThreadPool() //创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。 public static ExecutorService newSingleThreadExecutor() //创建一个单线程化的Executor。 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) //创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
public static void cachedThread(){ ExecutorService exec = Executors.newCachedThreadPool(); int i = 5; while(i-- > 0) exec.execute(new SimpleI()); exec.shutdown(); System.out.println("cachedThread:"); } public static void fixedThread(){ ExecutorService exec = Executors.newFixedThreadPool(5); int i = 5; while(i-- > 0) exec.execute(new SimpleI()); exec.shutdown(); System.out.println("fixedThread:"); } public static void singleThread(){ ExecutorService exec = Executors.newSingleThreadExecutor(); int i = 5; while(i-- > 0) exec.execute(new SimpleI()); exec.shutdown(); System.out.println("singleThread:"); }
调用后分别有如下输出:
cachedThread: #1(9),#3(9),#2(9),#0(9),#4(9),#4(8),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),#2(8),#2(7),#3(8),#1(8),#3(7),#2(6),#3(6),#1(7),#3(5),#2(5),#3(4),#1(6),#3(3),#2(4),#3(2),#1(5),#3(1),#2(3),#3(over),#1(4),#1(3),#1(2),#2(2),#1(1),#2(1),#1(over),#2(over), fixedThread: #4(9),#3(9),#1(9),#2(9),#1(8),#0(9),#1(7),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(8),#3(7),#4(8),#3(6),#4(7),#3(5),#4(6),#3(4),#4(5),#3(3),#4(4),#3(2),#4(3),#3(1),#4(2),#3(over),#4(1),#4(over), singleThread: #0(9),#0(8),#0(7),#0(6),#0(5),#0(4),#0(3),#0(2),#0(1),#0(over),#1(9),#1(8),#1(7),#1(6),#1(5),#1(4),#1(3),#1(2),#1(1),#1(over),#2(9),#2(8),#2(7),#2(6),#2(5),#2(4),#2(3),#2(2),#2(1),#2(over),#3(9),#3(8),#3(7),#3(6),#3(5),#3(4),#3(3),#3(2),#3(1),#3(over),#4(9),#4(8),#4(7),#4(6),#4(5),#4(4),#4(3),#4(2),#4(1),#4(over),
在任何线程池中,任何现有的线程,在可能的情况下,都会被自动复用。 通常,CachedThread是合理的Executor的首选,它会创建与所需数量相当的线程,然后再回首线程时停止创建新线程。如果这种方式引发了问题,就得考虑切换到FixedChread了。
2.4 线程池生命周期
ExecutorService扩展了Executor并添加了一些生命周期管理的方法。 一个Executor的生命周期有三种状态,运行 ,关闭 ,终止 。 Executor创建时处于运行状态。 当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务, 所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。 如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。
2.5 从任务中返回结果
Runnable是执行工作的独立任务,但是不返回任何值。如果需要返回值,那么就实现Callable接口而不是Runnable。Callable(Java SE5中引入的)是一种具有类型参数的泛型,类型参数表示的是从call()中返回的值,并且必须使用ExecutorService.submit()调用:
public class CallableDemo implements Callable<String>{ static int id = 0; @Override public String call() throws Exception { return System.currentTimeMillis() + ":" + id++ +","; } public static void main(String args[]) throws InterruptedException, ExecutionException{ ExecutorService service = Executors.newCachedThreadPool(); int i = 5; List<Future<String>> result = new ArrayList<>(); while(i-- > 0) result.add(service.submit(new CallableDemo())); System.out.println("CallableDemo:" + System.currentTimeMillis()); for(Future<String> f : result) System.out.print(f.get()); } }/*输出:CallableDemo:1448935006857 1448935006857:1,1448935006857:0,1448935006859:2,1448935006859:3,1448935006859:4,*/
submit()方法会产生Future对象。可以调用isDone()方法查询Future是否已经完成。使用get()方法获取结果,使用get()会阻塞直到结果准备就绪。
2.6 休眠
可以直接调用sleep()将任务终止给定的时间,可以抛出InterruptedException异常,但是异常不能跨线程传播,所以必须在任务内处理,但这已经Old了,Java SE5引入了显示的sleep()版本,作为TimeUnit类的一部分:
// Thread.sleep(100); //old style // TimeUnit.MICROSECONDS.sleep(100); //Java SE5/6
2.7 优先级
线程的优先级会将它的重要性传递给调度器,使得优先级越高的,将得到更高的执行频率。
使用getPriority()和(任何时候)setPriority()读取和设置现有线程的优先级:
public class SimplePriorities implements Runnable { int priority; double d = 0.0; public SimplePriorities(int priority) { this.priority = priority; } @Override public void run() { Thread.currentThread().setPriority(priority); for(int i = 1;i<100000000;i++) d += (Math.PI + Math.E)/(double)i*Math.PI; System.out.print("priority:" + priority + ","); Thread.yield();//让步:工作得差不多了,可以让别的线程使用了O(∩_∩)O~ } public static void main(String args[]){ ExecutorService executorService = Executors.newCachedThreadPool(); int i = 5; while(i-->0) executorService.execute(new SimplePriorities(Thread.MIN_PRIORITY)); i = 5; while(i-->0) executorService.execute(new SimplePriorities(Thread.NORM_PRIORITY)); executorService.execute(new SimplePriorities(Thread.MAX_PRIORITY)); System.out.println("SimplePriorities:"); executorService.shutdown(); } }/*SimplePriorities: priority:10,priority:5,priority:5,priority:5,priority:5,priority:5,priority:1,priority:1,priority:1,priority:1,priority:1,*/
这里,在线程中做了大量的浮点数运算,以保证线程能够消耗一定的资源,当然不同机器效果不一定相同,如果cpu能力一般,可将循环次数减少。
虽然JDK提供了10个优先级,但是考虑到各大操作系统的差异性,映射关系并不稳定,所以通常只使用给定的3中级别。
2.8 join()
一个线程可以在其他线程之上调用join()方法:在被调用线程结束后继续执行,还可以带超时参数,中断join()方法的调用是在调用线程上调用interrupt()方法(结合try)。
JoinDemo.java:
public class JoinDemo { public static void main(String args[]){ Sleeper sleepy = new Sleeper("Sleepy", 1500), grumpy = new Sleeper("Grumpy", 1500); new Joiner("Dopey", sleepy); new Joiner("Doc", grumpy); grumpy.interrupt(); } } class Sleeper extends Thread{ int duration; public Sleeper(String name,int time) { super(name); duration = time; start(); } @Override public void run() { try { sleep(duration); } catch (InterruptedException e) { System.out.println(getName() + " was interrupted. " + "isInterrupted():" + isInterrupted()); //被中断 return; } System.out.println(getName() + " has awakened");//被唤醒 } } class Joiner extends Thread{ Sleeper sleeper; public Joiner(String name,Sleeper sleeper) { super(name); this.sleeper = sleeper; start(); } @Override public void run() { try { sleeper.join(); } catch (InterruptedException e) { System.out.println("Interrupted"); } System.out.println(getName() + " join completed"); } }/*输出:Grumpy was interrupted. isInterrupted():false Doc join completed Sleepy has awakened Dopey join completed*/
Sleeper是一个Thread类型,在run中,sleep()可能会在指定的时间满后返回,也可能被中断。在catch里面,将根据isInterrupted()的返回值报告这个中断,当另一个线程在该线程上调用interrupt()时,将给该线程设定一个标志,表明该线程已经被中断了。异常被捕获时将清理这个标志,所以异常被捕获时,标志总是为假。 Joiner线程将通过在Sleeper对象上调用join()来等待Sleeper醒来,从输出可以发现,Sleeper被中断或者正常结束,Joiner和Sleeper将一起结束(需看详细过程,可在Joiner中sleep一定时间并打印时附加System.currentTimeMillis()即可)。
2.9 捕获异常
由于线程的本质特征,我们是不能从线程中捕获逃逸的异常的。异常逃出任务的run方法后,就会向外传播到控制台:
public class ExceptionThread implements Runnable { @Override public void run() { throw new RuntimeException("anxpp.com"); } public static void main(String args[]){ try{ Executors.newCachedThreadPool().execute(new ExceptionThread()); }catch(RuntimeException e){} } }/*输出: Exception in thread "pool-1-thread-1" java.lang.RuntimeException: anxpp.com at com.anxpp.concurrent.ExceptionThread.run(ExceptionThread.java:8) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)*/
可以看到,即使在main中使用了try语句也不能捕获到异常。
(Java SE5之前,可以通过线程组来捕获)现在,我们可以使用Excutor来捕获这些异常。为了解决问题,我们需要修改Executor产生线程的方式。Thread.UncaughtExceptionHandler(Java SE5中的新接口)允许在每个线程上都附着一个异常处理器,Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用:
public class CaptureThreadException { public static void main(String args[]){ ExecutorService service = Executors.newCachedThreadPool(new HandlerThreadFactory()); service.execute(new ExceptionThreadN()); service.shutdown(); System.out.println("CaptureThreadException:"); } } class ExceptionThreadN implements Runnable{ @Override public void run() { Thread thread = Thread.currentThread(); System.out.println("run() by " + thread.getName()); System.out.println("eh=" + thread.getUncaughtExceptionHandler()); throw new RuntimeException("anxpp.com_exception"); } } class MyUncaughtExceptionHandler implements UncaughtExceptionHandler{ @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("caught:" + e); } } class HandlerThreadFactory implements ThreadFactory{ @Override public Thread newThread(Runnable r) { System.out.println(this + " creating new Thread"); Thread t = new Thread(r); t.setName("T"); System.out.println("created:" + t.getName()); t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); System.out.println("eh=" + t.getUncaughtExceptionHandler()); return t; } }/*输出: com.anxpp.concurrent.HandlerThreadFactory@5abe753a creating new Thread created:T eh=com.anxpp.concurrent.MyUncaughtExceptionHandler@5e9a94 run() by T eh=com.anxpp.concurrent.MyUncaughtExceptionHandler@5e9a94 CaptureThreadException: caught:java.lang.RuntimeException: anxpp.com_exception*/
可以看到,未捕获的异常是通过uncaughtException来捕获的。如果需要在各处使用相同的异常处理器,可设置一个静态域,并将处理器设置为默认的。
3 (受限)资源共享
有了并发就可以同时做多件事情,但是,两个和多个线程彼此互相干涉的问题就出现了。
3.1 引出问题
首先,编写一个可以产生偶数的抽象类:
public abstract class IntGenerator { protected int value = 0; private volatile boolean canceled = false; public abstract int next(); public void cancel(){ canceled = true; } public boolean isCanceled(){ return canceled; } }
然后编写一个测试类(可测试所有继承自上面这个类的子类),并实现抽象类后测试:
public class Check implements Runnable { IntGenerator g; final int id; public Check(IntGenerator g,int id) { this.id = id; this.g = g; } @Override public void run() { while(!g.isCanceled()){ int val = g.next(); if(val % 2 != 0){ System.out.println("not pass:" + val); g.cancel(); } } } public static void test(IntGenerator g,int count){ ExecutorService service = Executors.newCachedThreadPool(); for(int i = 0;i < count; i++) service.execute(new Check(g, i)); service.shutdown(); } public static void test(IntGenerator g){ test(g,20); }//default count public static void main(String args[]){ System.out.println("begin:"); test(new IntGenerator() { @Override public int next() { value++;危险,此处放置yield会更快出现问题! value++; return value; } },10); } }/*输出 begin: not pass:575 not pass:579 not pass:577 not pass:581*/
一个任务可能会在另一个任务执行2次自增之间调用next()方法,造成程序最终运行失败。再者,Java中的自增并不是原子操作,在自增过程中任务也可能会被线程机制挂起!所以,即使是单一递增,不保护任务也不是安全的。
3.2 解决共享资源竞争问题
我们在使用多线程的时候,总是要多加谨慎,就像我们打LOL的时候,引导技能正准备最后一击带他升天的时候,敌人躺下了,因为你的线程挂起了,另一个玩家击杀了他。那么解决之道就是当资源被一个任务使用的时候,在其上加锁。基本所有的并发模式在解决线程冲突的时候,都是采用序列化访问共享资源的方案。在代码前加的锁语句产生了一种相互排斥的效果,就是常常被称为互斥量的机制。 比如:大家都想单独使用卫生间(排除一些爱好比较广泛的人),为了使用,就先敲门,如果没人就进入并锁上门后使用,其他的人就不能进来只能等待直到可以使用。而且,人们也并没有排队,谁都可能是下一个敲门的人。可以通过yield()和setPriority()给线程调度器提供建议(虽然不一定有效,具体效果取决于具体平台和JVM的实现)。 Java通过提供关键字 synchronized 的形式,提供内置支持。
共享资源一般是以对象形式存在,但也可能是文件,其他I/O或打印机。要控制对共享资源的访问,得先将其包装进一个对象。然后把访问这个对象的方法标记为 synchronized :
synchronized void f(){ /* ... */}
(同步控制)改造以上方法:
public static void main(String args[]){ System.out.println("begin:"); test(new IntGenerator() { @Override public synchronized int next() { value++; Thread.yield();//问了增加value为奇数时上下文切换的可能性 value++; if(value%20000000 == 0) System.out.println(value); return value; } },100); }/*输出: begin: 20000000 40000000 60000000 ...一直都不会出问题*/
任何时候,都只有一个任务可以通过由互斥量看护的代码。
使用显示的Lock对象:
public static void main(String args[]){ System.out.println("begin:"); test(new IntGenerator() { private Lock lock = new ReentrantLock(); @Override public int next() { lock.lock(); try{ value++; Thread.yield(); value++; if(value%20000000 == 0) System.out.println(value); return value;//必须在try语句里,不然unlock()会过早发生! }finally{ lock.unlock(); } } },100); }
显示的锁代码会不那么优雅,但是更加灵活。通常我们可以使用更简洁的方式,但在解决特殊问题时(如处理异常、清理工作等),就要使用显示的Lock了。
3.3 原子性与易变量
由于Java的跨平台特性,诸如自增等不像C++(通常情况下)一样是原子操作(即这个操作过程是不能中断的),一旦操作开始,它一定可以在可能发生的“上下文切换”之前执行完毕。所以在Java上我们应该避免使用原子性来代替同步。如果一定要玩火,请接受下面的测试:Goetz测试(以为并发专家命名的测试):如果你可以编写用于现代微处理器的高性能JVM,那么久有资格去考虑是否可以避免同步!
易变性(当时在单片机上做c开发的时候就吃过亏),volatile意为易失的。本人也没做深入了解,就不说多了,毕竟也很少用,但有以下原子类,在设计性能调优时挺有用的,他们可以在机器级别上获得原子性: AtomicInteger,AtomicLong,AtomicReference等,具体如何操作请度娘,这里将改写上面的方法一边丢掉synchronized:
public static void main(String args[]){ test(new IntGenerator() { @Override public int next() { return i.addAndGet(2); } },100); }/*这里消除了synchronized关键字,但程序不会失败!*/
3.4 临界区(critical section)
只希望防止多个线程访问方法内部分代码(同步控制块),可以用如下方式:
synchronized(syncObject/*指定对象*/){ //code here 对花括号内代码进行同步控制 }
使用同步控制块而不是对整个方法进行同步控制可以显著提高性能!
3.5 在其他对象上同步
3.6 线程本地存储
4 终结任务
---未完待续