Callable与Runnable的区别
Callable接口类有返回值,具体返回值在Callable接口类的call()方法返回,该返回值可以通过实现Future接口的对象的get()方法获取,该方法会阻塞主线程(如果call方法还没执行完);而Runnable是没有返回值的,因为Runnable接口类的run()方法是没有返回值的,Runnable接口方法是不会造成主线程阻塞的(仅在使用new Thread创建线程或利用线程池创建线程但不执行FutureTask的get()方法情况下)
Callable接口类中的call()方法可以抛出异常;而Runable接口类中的run()方法是不可以抛出异常的,异常只能在run方法内必须得到处理,不能向外界抛出
在使用Thread类实现多线程的情况下, Callable接口实现类首先需要将自己的对象实例作为参数来创建一个FutureTask对象,然后才可以通过new Thread(Runnable target)来创建线程;而Runnable接口实现类可以直接通过new Thread(Runnable target)创建线程
FutureTask类实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable, Future接口,即FutureTask类也实现了Runnable, Future接口。
Callable与Runnable的相同点
两者都可以将自己接口实现类的实例作为参数传递给ExecutorService接口类中的submit()方法直接分配线程运行,多个submit()可以异步执行,如: Future submit(Callable task)、 Future<?> submit(Runnable task)
只要调用了Future的get()方法了,无论是submit()方法中的参数是Callable还是Runnable,都会阻塞主线程,直到Callable中的call()方法或Runnable中的run()方法执行完毕,主线程才能继续执行
ExecutorService的execute()和submit()区别:
参数及返回值不一样。execute()只接受Runnable对象作为参数,且没有返回值,无法判断任务是否成功完成;submit()可以接受Runnable对象或Callable对象作为参数,且该方法返回一个Future对象,可以用这个Future对象来判断任务是否成功完成及获取Callable线程方法返回结果
submit方便Exception处理
使用方式
Callable
Thread方式
1、 CallableTest.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class CallableTest { public static void main (String[] args) { MyCallable callable = new MyCallable(); FutureTask<String> task = new FutureTask<String>(callable); Thread thread = new Thread(task); long startTime = System.currentTimeMillis(); thread.start(); try { String result = task.get(); long endTime = System.currentTimeMillis(); System.out.println("得到返回值: " +result); System.out.println("结束执行get的时间: " + (endTime-startTime)/(1000 ) + "秒" ); } catch (Exception e) { e.printStackTrace(); } } }
2、MyCallable.java
1 2 3 4 5 6 7 8 9 public class MyCallable implements Callable <String > { @Override public String call () throws Exception { Thread.sleep(3000 ); return "call method result" ; } }
执行结果:
当主线程执行到String result = task.get();
时就阻塞了。等待callable对象中的call()方法执行完,main线程才能继续往下执行。
线程池方式
将CallableTest改一改换成用线程池的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class CallableTest { public static void main (String[] args) { MyCallable callable = new MyCallable(); ExecutorService executorService = Executors.newFixedThreadPool(2 ); Future<String> task = (FutureTask<String>) executorService.submit(callable); long startTime = System.currentTimeMillis(); try { String result = task.get(); long endTime = System.currentTimeMillis(); System.out.println("得到返回值: " +result); System.out.println("结束执行get的时间: " + (endTime-startTime)/(1000 ) + "秒" ); } catch (Exception e) { e.printStackTrace(); }finally { executorService.shutdown(); } } }
执行结果:
Runnable
Thread方式
1、RunnableTest.java
1 2 3 4 5 6 7 8 9 10 public class RunnableTest { public static void main (String[] args) { MyRunnable runnable = new MyRunnable(); Thread thread = new Thread(runnable); long startTime = System.currentTimeMillis(); thread.start(); long endTime = System.currentTimeMillis(); System.out.println("启动任务之后时间: " + (endTime-startTime)/(1000 ) + "秒" ); } }
2、MyRunnable.java
1 2 3 4 5 6 7 8 9 10 11 12 13 public class MyRunnable implements Runnable { @Override public void run () { try { Thread.sleep(3000 ); System.out.println("MyRunnable线程:" +Thread.currentThread().getName()+"执行完毕" ); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果:
runnable对象中run()方法不会阻塞main线程的执行。
线程池方式
将RunnableTest改一改换成用线程池的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class RunnableTest { public static void main (String[] args) { MyRunnable runnable = new MyRunnable(); ExecutorService executorService = Executors.newFixedThreadPool(2 ); long startTime = System.currentTimeMillis(); Future<String> task = (FutureTask<String>) executorService.submit(runnable); try { System.out.println(task.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }finally { executorService.shutdown(); } long endTime = System.currentTimeMillis(); System.out.println("启动任务之后时间: " + (endTime-startTime)/(1000 ) + "秒" ); } }
执行结果:
可以发现,只要调用了Future的get()方法,主线程就阻塞了。去掉该方法,重新执行下,结果如下:
结果跟用Thread方式执行的结果一致。
ExecutorService-submit()封装过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
在AbstractExecutorService类的源码中可以看到,submit中的newTaskFor()方法将Callable对象或Runnable对象封装成了FutureTask对象返回。
Future-get()获取返回值及阻塞原因
FutureTask类实现了Future接口,所以我们以FutureTask类源码为例进行分析。 FutureTask源码中的几个静态类变量及构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private volatile int state; private static final int NEW = 0 ; private static final int COMPLETING = 1 ; private static final int NORMAL = 2 ; private static final int EXCEPTIONAL = 3 ; private static final int CANCELLED = 4 ; private static final int INTERRUPTING = 5 ; private static final int INTERRUPTED = 6 ; public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException(); this .callable = callable; this .state = NEW; }
以Thread方式的Callable为例,在调用了线程的start方法之后会使得该线程处于就绪状态,有了竞争CPU时间片的权限,当分配到时间片之后,就会执行FutureTask的run()方法,其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
在调用完call()方法后,将执行结果赋值给result。如果程序正常执行的话,就将result在set()方法进行具体的赋值操作,set()方法的源码如下:
1 2 3 4 5 6 7 8 9 protected void set (V v) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this , stateOffset, NORMAL); finishCompletion(); } }
set方法将result赋给了outcome,outcome是Object类型的全局变量,同时将state状态设置为NORMAL,接着会执行finishCompletion()方法,finishCompletion()其实就是用来唤醒get()方法的。get()方法的源码如下:
1 2 3 4 5 6 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); }
如果Callable中call()方法的业务逻辑还没处理完毕,此时state为NEW,对应的值小于COMPLETING,执行awaitDone() 方法(阻塞线程的地方),其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private int awaitDone (boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L ; WaitNode q = null ; boolean queued = false ; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null ) q.thread = null ; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null ) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this , waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { removeWaiter(q); return state; } LockSupport.parkNanos(this , nanos); } else LockSupport.park(this ); } }
执行过程:
计算deadline,也就是到某个时间点后如果还没有返回结果,那么就超时了。
进入自旋,也就是死循环。
首先判断是否响应线程中断。对于线程中断的响应往往会放在线程进入阻塞之前。
判断state值,如果>COMPLETING表明任务已经取消或者已经执行完毕,就可以直接返回了。
如果任务还在执行,则为当前线程初始化一个等待节点WaitNode,入等待队列。
计算nanos,判断是否已经超时。如果已经超时,则移除所有等待节点,直接返回state。超时的话,state的值仍然还是COMPLETING。
如果还未超时,就通过LockSupprot类提供的方法在指定时间内挂起当前线程,等待任务线程唤醒或者超时
在awaitDone方法中,会一直自旋(正常情况下),直到set()方法将state更新为NORMAL并执行完finishCompletion()方法,来唤醒get()方法中的awaitDone()方法挂起的线程。其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void finishCompletion () { for (WaitNode q; (q = waiters) != null ;) { if (UNSAFE.compareAndSwapObject(this , waitersOffset, q, null )) { for (;;) { Thread t = q.thread; if (t != null ) { q.thread = null ; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null ) break ; q.next = null ; q = next; } break ; } } done(); callable = null ; }
当任务正常结束或者异常时,都会调用finishCompletion()方法去唤醒等待线程。当get()方法中的awaitDone()方法挂起的线程被唤醒后,就可以继续执行get()方法中的report()方法返回执行结果。
总结
Callable由FutureTask的run()方法启动线程:1、在Callable对象中的call()方法的业务逻辑执行完后;2、调用FutureTask的set()方法对任务的state进行更新为COMPLETING;3、对全局变量结果outcome进行赋值;4、再次更新state为NORMAL;5、调用finishCompletion()唤醒FutureTask的get()中挂起的线程,继续执行get()方法中awaitDone下面的代码。 FutureTask的get()方法中,先判断state<=COMPLETING,若是则调用awaitDone()挂起线程,自旋等待上述1-5步骤执行完毕,才能返回全局变量结果outcome;若不是则直接返回全局变量结果outcome。FutureTask的get()阻塞是通过自旋+挂起线程实现。