用过Java并发包的朋友或许对Future (interface) 已经比较熟悉了,其实Future 本身是一种被广泛运用的并发设计模式,可在很大程度上简化需要数据流同步的并发应用开发。在一些领域语言(如Alice ML )中甚至直接于语法层面支持Future。
这里就以java.util.concurrent.Future 为例简单说一下Future的具体工作方式。Future对象本身可以看作是一个显式的引用,一个对异步处理结果的引用。由于其异步性质,在创建之初,它所引用的对象可能还并不可用(比如尚在运算中,网络传输中或等待中)。这时,得到Future的程序流程如果并不急于使用Future所引用的对象,那么它可以做其它任何想做的事儿,当流程进行到需要Future背后引用的对象时,可能有两种情况:
希望能看到这个对象可用,并完成一些相关的后续流程。如果实在不可用,也可以进入其它分支流程。
“没有你我的人生就会失去意义,所以就算海枯石烂,我也要等到你。”(当然,如果实在没有毅力枯等下去,设一个超时也是可以理解的)
对于前一种情况,可以通过调用Future.isDone()判断引用的对象是否就绪,并采取不同的处理;而后一种情况则只需调用get()或
get(long timeout, TimeUnit unit)通过同步阻塞方式等待对象就绪。实际运行期是阻塞还是立即返回就取决于get()的调用时机和对象就绪的先后了。
简单而言,Future模式可以在连续流程中满足数据驱动的并发需求,既获得了并发执行的性能提升,又不失连续流程的简洁优雅。
与其它并发设计模式的对比
除了Future外,其它比较常见的并发设计模式还包括“回调驱动(多线程环境下)”、“消息/事件驱动(Actor模型中)”等。
回调是最常见的异步并发模式,它有即时性高、接口设计简单等有点。但相对于Future,其缺点也非常明显。首先,多线程环境下的回调一般是在触发回调的模块线程中执行的,这就意味着编写回调方法时通常必须考虑线程互斥问题;其次,回调方式接口的提供者在本模块的线程中执行用户应用的回调也是相对不安全的,因为你无法确定它会花费多长时间或出现什么异常,从而可能间接导致本模块的即时性和可靠性受影响;再者,使用回调接口不利于顺序流程的开发,因为回调方法的执行是孤立的,要与正常流程汇合是比较困难的。因此回调接口适合于在回调中只需要完成简单任务,并且不必与其它流程汇合的场景。
上述这些回调模式的缺点恰恰正是Future的长项。由于Future的使用是将异步的数据驱动天然的融入顺序流程中,因此你完全不必考虑线程互斥问题,Future甚至可以在单线程的程序模型(例如协程)中实现(参见下文将要提到的“Lazy Future”)。另一方面,提供Future接口的模块完全不必担心像回调接口那样的可靠性问题和可能对本模块的即时性影响。
另一类常见的并发设计模式是“消息(事件)驱动”,它一般运用在Actor模型中:服务请求者向服务提供者发送消息,然后继续进行后续不依赖服务处理结果的任务,在需要依赖结果前终止当前流程并记录状态;在等到回应消息后根据记录的状态触发后续流程。这种基于状态机的并发控制虽然比回调更适合于有延续性的顺序流程,但开发者却不得不将连续流程按照异步服务的调用切断为多个按状态区分的子流程,这样又人为的增加了开发的复杂性。运用Future模式可以避免这个问题,不必为了异步调用而打碎连续的流程。但是有一点应当特别注意:Future.get()方法可能会阻塞线程的执行,所以它通常无法直接融入常规的Actor模型中。(基于协程的Actor模型可以较好的解决这个冲突)
Future的灵活性还体现在其同步和异步的自由取舍,开发者可以根据流程的需要自由决定是否需要等待[Future.isDone()],何时等待[Future.get()],等待多久[Future.get(timeout)]。比如可以根据数据是否就绪而决定要不要借这个空档完成点其它任务,这对于实现“异步分支预测”机制是相当方便的。
Future的衍生
除了上面提到的基础形态之外,Future还有丰富的衍生变化,这里就列举几个常见的。
Lazy Future
与一般的Future不同,Lazy Future在创建之初不会主动开始准备引用的对象,而是等到请求对象时才开始相应的工作。因此,Lazy Future本身并不是为了实现并发,而是以节约不必要的运算资源为出发点,效果上与Lambda/Closure类似。例如设计某些API时,你可能需要返回一组信息,而其中某些信息的计算可能会耗费可观的资源。但调用者不一定都关心所有的这些信息,因此将那些需要耗费较多资源的对象以Lazy Future的形式提供,可以在调用者不需要用到特定的信息时节省资源。
另外Lazy Future也可以用于避免过早的获取或锁定资源而产生的不必要的互斥。
Promise
Promise可以看作是Future的一个特殊分支,常见的Future一般是由服务调用者直接触发异步处理流程,比如调用服务时立即触发处理或 Lazy Future的取值时触发处理。但Promise则用于显式表示那些异步流程并不直接由服务调用者触发的情景。例如Future接口的定时控制,其异步流程不是由调用者,而是由系统时钟触发,再比如淘宝的分布式订阅框架提供的Future式订阅接口,其等待数据的可用性不是由订阅者决定,而在于发布者何时发布或更新数据。因此,相对于标准的Future,Promise接口一般会多出一个set()或fulfill()接口。
可复用的Future
常规的Future是一次性的,也就是说当你获得了异步的处理结果后,Future对象本身就失去意义了。但经过特殊设计的Future也可以实现复用,这对于可多次变更的数据显得非常有用。例如前面提到的淘宝分布式订阅框架所提供的Future式接口,它允许多次调用waitNext()方法(相当于Future.get()),每次调用时是否阻塞取决于在上次调用后是否又有数据发布,如果尚无更新,则阻塞直到下一次的数据发布。这样设计的好处是,接口的使用者可以在其任何合适的时机,或者直接简单的在独立的线程中通过一个无限循环响应订阅数据的变化,同时还可兼顾其它定时任务,甚至同时等待多个Future。简化的例子如下:
1
2
3
4
5
6
7
8
9
10
|
for (;;) { schedule = getNextScheduledTaskTime(); while (schedule > now()) { try { data = subscription.waitNext(schedule - now()); processData(data); } catch (Exception e) {...} } doScheduledTask(); } |
Future的使用
首先列举一段Thinking in Java中的代码,这是出在并发中的Callable使用的一个例子,代码,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
//: concurrency/CallableDemo.java import java.util.concurrent.*; import java.util.*; class TaskWithResult implements Callable<String> { private int id; public TaskWithResult( int id) { this .id = id; } public String call() { return "result of TaskWithResult " + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>(); for ( int i = 0 ; i < 10 ; i++) results.add(exec.submit( new TaskWithResult(i))); for (Future<String> fs : results) try { // get() blocks until completion: System.out.println(fs.get()); } catch (InterruptedException e) { System.out.println(e); return ; } catch (ExecutionException e) { System.out.println(e); } finally { exec.shutdown(); } } } /* Output: result of TaskWithResult 0 result of TaskWithResult 1 result of TaskWithResult 2 result of TaskWithResult 3 result of TaskWithResult 4 result of TaskWithResult 5 result of TaskWithResult 6 result of TaskWithResult 7 result of TaskWithResult 8 result of TaskWithResult 9 */ //:~ |
解释一下Future的使用过程,首先ExecutorService对象exec调用submit()方法会产生Future对象,他用Callable返回结果的特定类型进行了参数化。你可以使用isDone()方法来查询Future是否已经完成。当任务完成时,他具有一个结果,你可以调用get()方法获取结果。你也可以不用isDone()进行检查直接调用get(),在这种情况下,get()将会阻塞知道结果准备就绪。你可以在调用具有超时的get()函数,或者先调用isDone()来查看任务是否完成,然后再调用get()。