Java并发编程-Callable与Runnable

Callable与Runnable的区别

  • Callable接口类有返回值,具体返回值在Callable接口类的call()方法返回,该返回值可以通过实现Future接口的对象的get()方法获取,该方法会阻塞主线程(如果call方法还没执行完);而Runnable是没有返回值的,因为Runnable接口类的run()方法是没有返回值的,Runnable接口方法是不会造成主线程阻塞的(仅在使用new Thread创建线程或利用线程池创建线程但不执行FutureTask的get()方法情况下)

  • Callable接口类中的call()方法可以抛出异常;而Runable接口类中的run()方法是不可以抛出异常的,异常只能在run方法内必须得到处理,不能向外界抛出

  • 在使用Thread类实现多线程的情况下, Callable接口实现类首先需要将自己的对象实例作为参数来创建一个FutureTask对象,然后才可以通过new Thread(Runnable target)来创建线程;而Runnable接口实现类可以直接通过new Thread(Runnable target)创建线程

FutureTask类实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable, Future接口,即FutureTask类也实现了Runnable, Future接口。

Callable与Runnable的相同点

  • 两者都可以将自己接口实现类的实例作为参数传递给ExecutorService接口类中的submit()方法直接分配线程运行,多个submit()可以异步执行,如: Future submit(Callable task)、 Future<?> submit(Runnable task)
  • 只要调用了Future的get()方法了,无论是submit()方法中的参数是Callable还是Runnable,都会阻塞主线程,直到Callable中的call()方法或Runnable中的run()方法执行完毕,主线程才能继续执行

ExecutorService的execute()和submit()区别:

  • 参数及返回值不一样。execute()只接受Runnable对象作为参数,且没有返回值,无法判断任务是否成功完成;submit()可以接受Runnable对象或Callable对象作为参数,且该方法返回一个Future对象,可以用这个Future对象来判断任务是否成功完成及获取Callable线程方法返回结果
  • submit方便Exception处理

使用方式

Callable

Thread方式

1、 CallableTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CallableTest{

public static void main(String[] args) {
//创建实现了Callable接口的对象
MyCallable callable = new MyCallable();
//将实现Callable接口的对象作为参数创建一个FutureTask对象
FutureTask<String> task = new FutureTask<String>(callable);
//创建线程处理当前callable任务
Thread thread = new Thread(task);
//开启线程
long startTime = System.currentTimeMillis();
thread.start();
//获取到call方法的返回值
try {
String result = task.get();
long endTime = System.currentTimeMillis();
System.out.println("得到返回值: "+result);
System.out.println("结束执行get的时间: "+ (endTime-startTime)/(1000) + "秒");
} catch (Exception e) {
e.printStackTrace();
}
}
}

2、MyCallable.java

1
2
3
4
5
6
7
8
9
public class MyCallable implements Callable<String> {

@Override
public String call() throws Exception {
Thread.sleep(3000);
return "call method result";
}

}

执行结果:
CallableThread
当主线程执行到String result = task.get();时就阻塞了。等待callable对象中的call()方法执行完,main线程才能继续往下执行。

线程池方式

将CallableTest改一改换成用线程池的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class CallableTest{

public static void main(String[] args) {
//创建实现了Callable接口的对象
MyCallable callable = new MyCallable();
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<String> task = (FutureTask<String>) executorService.submit(callable);

//开启线程
long startTime = System.currentTimeMillis();
//获取到call方法的返回值
try {
String result = task.get();
long endTime = System.currentTimeMillis();
System.out.println("得到返回值: "+result);
System.out.println("结束执行get的时间: "+ (endTime-startTime)/(1000) + "秒");
} catch (Exception e) {
e.printStackTrace();
}finally{
executorService.shutdown();
}
}
}

执行结果:
CallableThread

Runnable

Thread方式

1、RunnableTest.java

1
2
3
4
5
6
7
8
9
10
public class RunnableTest {
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable);
long startTime = System.currentTimeMillis();
thread.start();
long endTime = System.currentTimeMillis();
System.out.println("启动任务之后时间: "+ (endTime-startTime)/(1000) + "秒");
}
}

2、MyRunnable.java

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyRunnable implements Runnable {

@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("MyRunnable线程:"+Thread.currentThread().getName()+"执行完毕");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

执行结果:
RunnableThread
runnable对象中run()方法不会阻塞main线程的执行。

线程池方式

将RunnableTest改一改换成用线程池的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RunnableTest {
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
long startTime = System.currentTimeMillis();
Future<String> task = (FutureTask<String>) executorService.submit(runnable);
try {
System.out.println(task.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
executorService.shutdown();
}
long endTime = System.currentTimeMillis();
System.out.println("启动任务之后时间: "+ (endTime-startTime)/(1000) + "秒");
}
}

执行结果:
Runnable-get
可以发现,只要调用了Future的get()方法,主线程就阻塞了。去掉该方法,重新执行下,结果如下:
Runnable-non-get
结果跟用Thread方式执行的结果一致。

ExecutorService-submit()封装过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}


public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

在AbstractExecutorService类的源码中可以看到,submit中的newTaskFor()方法将Callable对象或Runnable对象封装成了FutureTask对象返回。

Future-get()获取返回值及阻塞原因

FutureTask类实现了Future接口,所以我们以FutureTask类源码为例进行分析。
FutureTask源码中的几个静态类变量及构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions: 任务状态之间的转换过程
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state; //任务状态变量,由CAS操作保证原子性
private static final int NEW = 0; //任务新建和执行中
private static final int COMPLETING = 1; //任务将要执行完毕
private static final int NORMAL = 2; //任务正常执行结束
private static final int EXCEPTIONAL = 3; //任务异常
private static final int CANCELLED = 4; //任务取消
private static final int INTERRUPTING = 5; //任务线程即将被中断
private static final int INTERRUPTED = 6; //任务线程已中断

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

以Thread方式的Callable为例,在调用了线程的start方法之后会使得该线程处于就绪状态,有了竞争CPU时间片的权限,当分配到时间片之后,就会执行FutureTask的run()方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //任务执行
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); //异常对象赋给outcome
}
if (ran)
set(result); //执行结果赋给outcome
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

在调用完call()方法后,将执行结果赋值给result。如果程序正常执行的话,就将result在set()方法进行具体的赋值操作,set()方法的源码如下:

1
2
3
4
5
6
7
8
9
protected void set(V v) {
//将state由NEW更新为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//赋值完毕,将state由COMPLETING更新为NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

set方法将result赋给了outcome,outcome是Object类型的全局变量,同时将state状态设置为NORMAL,接着会执行finishCompletion()方法,finishCompletion()其实就是用来唤醒get()方法的。get()方法的源码如下:

1
2
3
4
5
6
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

如果Callable中call()方法的业务逻辑还没处理完毕,此时state为NEW,对应的值小于COMPLETING,执行awaitDone() 方法(阻塞线程的地方),其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 任务截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 自旋
for (;;) {
if (Thread.interrupted()) {
//线程中断则移除等待线程,并抛出异常
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
// 任务可能已经完成或者被取消了
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// 可能任务线程被阻塞了,主线程让出CPU
Thread.yield();
else if (q == null)
// 等待线程节点为空,则初始化新节点并关联当前线程
q = new WaitNode();
else if (!queued)
// 等待线程入队列,成功则queued=true
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//已经超时的话,移除等待节点
removeWaiter(q);
return state;
}
// 未超时,将当前线程挂起指定时间
LockSupport.parkNanos(this, nanos);
}
else
// timed=false时会走到这里,挂起当前线程
LockSupport.park(this);
}
}

执行过程:

  • 计算deadline,也就是到某个时间点后如果还没有返回结果,那么就超时了。
  • 进入自旋,也就是死循环。
  • 首先判断是否响应线程中断。对于线程中断的响应往往会放在线程进入阻塞之前。
  • 判断state值,如果>COMPLETING表明任务已经取消或者已经执行完毕,就可以直接返回了。
  • 如果任务还在执行,则为当前线程初始化一个等待节点WaitNode,入等待队列。
  • 计算nanos,判断是否已经超时。如果已经超时,则移除所有等待节点,直接返回state。超时的话,state的值仍然还是COMPLETING。
  • 如果还未超时,就通过LockSupprot类提供的方法在指定时间内挂起当前线程,等待任务线程唤醒或者超时

在awaitDone方法中,会一直自旋(正常情况下),直到set()方法将state更新为NORMAL并执行完finishCompletion()方法,来唤醒get()方法中的awaitDone()方法挂起的线程。其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void finishCompletion() {
//遍历等待节点
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒等待线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
// unlink to help gc
q.next = null;
q = next;
}
break;
}
}
//模板方法,可以被覆盖
done();
//清空callable
callable = null;
}

当任务正常结束或者异常时,都会调用finishCompletion()方法去唤醒等待线程。当get()方法中的awaitDone()方法挂起的线程被唤醒后,就可以继续执行get()方法中的report()方法返回执行结果。

总结

Callable由FutureTask的run()方法启动线程:1、在Callable对象中的call()方法的业务逻辑执行完后;2、调用FutureTask的set()方法对任务的state进行更新为COMPLETING;3、对全局变量结果outcome进行赋值;4、再次更新state为NORMAL;5、调用finishCompletion()唤醒FutureTask的get()中挂起的线程,继续执行get()方法中awaitDone下面的代码。
FutureTask的get()方法中,先判断state<=COMPLETING,若是则调用awaitDone()挂起线程,自旋等待上述1-5步骤执行完毕,才能返回全局变量结果outcome;若不是则直接返回全局变量结果outcome。FutureTask的get()阻塞是通过自旋+挂起线程实现。