动物做logo的网站,装修公司网站 源码,做长图网站,上海缘魁网站建设转载自 通过实例理解 JDK8 的 CompletableFuture 前言
Java 5 并发库主要关注于异步任务的处理#xff0c;它采用了这样一种模式#xff0c;producer 线程创建任务并且利用阻塞队列将其传递给任务的 consumer。这种模型在 Java 7 和 8 中进一步发展#xff0c;并且开始支持…转载自 通过实例理解 JDK8 的 CompletableFuture 前言
Java 5 并发库主要关注于异步任务的处理它采用了这样一种模式producer 线程创建任务并且利用阻塞队列将其传递给任务的 consumer。这种模型在 Java 7 和 8 中进一步发展并且开始支持另外一种风格的任务执行那就是将任务的数据集分解为子集每个子集都可以由独立且同质的子任务来负责处理。
这种风格的基础库也就是 fork/join 框架它允许程序员规定数据集该如何进行分割并且支持将子任务提交到默认的标准线程池中也就是通用的ForkJoinPool。Java 8 中fork/join 并行功能借助并行流的机制变得更加具有可用性。但是不是所有的问题都适合这种风格的并行处理所处理的元素必须是独立的数据集要足够大并且在并行加速方面每个元素的处理成本要足够高这样才能补偿建立 fork/join 框架所消耗的成本。CompletableFuture 类则是 Java 8 在并行流方面的创新。
准备知识
异步计算
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中简单的讲就是另启一个线程来完成调用中的部分计算使调用继续运行或返回而不需要等待计算结果。但调用者仍需要取线程的计算结果。
回调函数
回调函数比较通用的解释是它是一个通过函数指针调用的函数。如果你把函数的指针地址作为参数传递给另一个函数当这个指针被用为调用它所指向的函数时我们就说这是回调函数。回调函数不是由该函数的实现方直接调用而是在特定的事件或条件发生时由另外一方调用的用于对该事件或条件进行响应。
回调函数的机制
1定义一个回调函数
2提供函数实现的一方在初始化时候将回调函数的函数指针注册给调用者
3当特定的事件或条件发生的时候调用者使用函数指针调用回调函数对事件进行处理。
回调函数通常与原始调用者处于同一层次如图 1 所示
图 1 回调函数示例图 Future 接口介绍
JDK5 新增了 Future 接口用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力但是对于结果的获取却是很不方便只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背轮询的方式又会耗费无谓的 CPU 资源而且也不能及时地得到计算结果为什么不能用观察者设计模式呢即当计算结果完成及时通知监听者。
有一些开源框架实现了我们的设想例如 Netty 的 ChannelFuture 类扩展了 Future 接口通过提供 addListener 方法实现支持回调方式的异步编程。Netty 中所有的 I/O 操作都是异步的,这意味着任何的 I/O 调用都将立即返回而不保证这些被请求的 I/O 操作在调用结束的时候已经完成。取而代之地你会得到一个返回的 ChannelFuture 实例这个实例将给你一些关于 I/O 操作结果或者状态的信息。当一个 I/O 操作开始的时候一个新的 Future 对象就会被创建。在开始的时候新的 Future 是未完成的状态它既非成功、失败也非被取消因为 I/O 操作还没有结束。如果 I/O 操作以成功、失败或者被取消中的任何一种状态结束了那么这个 Future 将会被标记为已完成并包含更多详细的信息例如失败的原因。请注意即使是失败和被取消的状态也是属于已完成的状态。阻塞方式的示例代码如清单 1 所示。
清单 1 阻塞方式示例代码 1 2 3 4 5 6 // Start the connection attempt. ChannelFuture Future bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection is closed or the connection attempt fails. Future.getChannel().getCloseFuture().awaitUninterruptibly(); // Shut down thread pools to exit. bootstrap.releaseExternalResources();
上面代码使用的是 awaitUninterruptibly 方法源代码如清单 2 所示。
清单 2 awaitUninterruptibly 源代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 publicChannelFutureawaitUninterruptibly() { boolean interrupted false; synchronized (this) { //循环等待到完成 while (!done) { checkDeadLock(); waiters; try { wait(); } catch (InterruptedException e) { //不允许中断 interrupted true; } finally { waiters--; } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; }
清单 3 异步非阻塞方式示例代码 1 2 3 4 5 6 7 8 9 10 // Start the connection attempt. ChannelFuture Future bootstrap.connect(new InetSocketAddress(host, port)); Future.addListener(new ChannelFutureListener(){ public void operationComplete(final ChannelFuture Future) throws Exception { } }); // Shut down thread pools to exit. bootstrap.releaseExternalResources();
可以明显的看出在异步模式下上面这段代码没有阻塞在执行 connect 操作后直接执行到 printTime(异步时间 )随后 connect 完成Future 的监听函数输出 connect 操作完成。
非阻塞则是添加监听类 ChannelFutureListener通过覆盖 ChannelFutureListener 的 operationComplete 执行业务逻辑。
清单 4 异步非阻塞方式示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void addListener(final ChannelFutureListener listener) { if (listener null) { throw new NullPointerException(listener); } booleannotifyNow false; synchronized (this) { if (done) { notifyNow true; } else { if (firstListener null) { //listener 链表头 firstListener listener; } else { if (otherListeners null) { otherListeners new ArrayListChannelFutureListener(1); } //添加到 listener 链表中以便操作完成后遍历操作 otherListeners.add(listener); } ...... if (notifyNow) { //通知 listener 进行处理 notifyListener(listener); } }
这部分代码的逻辑很简单就是注册回调函数当操作完成后自动调用回调函数就达到了异步的效果。
CompletableFuture 类介绍
Java 8 中, 新增加了一个包含 50 个方法左右的类--CompletableFuture它提供了非常强大的 Future 的扩展功能可以帮助我们简化异步编程的复杂性并且提供了函数式编程的能力可以通过回调的方式处理计算结果也提供了转换和组合 CompletableFuture 的方法。
对于阻塞或者轮询方式依然可以通过 CompletableFuture 类的 CompletionStage 和 Future 接口方式支持。
CompletableFuture 类声明了 CompletionStage 接口CompletionStage 接口实际上提供了同步或异步运行计算的舞台所以我们可以通过实现多个 CompletionStage 命令并且将这些命令串联在一起的方式实现多个命令之间的触发。
我们可以通过 CompletableFuture.supplyAsync(this::sendMsg); 这么一行代码创建一个简单的异步计算。在这行代码中supplyAsync 支持异步地执行我们指定的方法这个例子中的异步执行方法是 sendMsg。当然我们也可以使用 Executor 执行异步程序默认是 ForkJoinPool.commonPool()。
我们也可以在异步计算结束之后指定回调函数例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify);这行代码中的 thenAccept 被用于增加回调函数在我们的示例中 notify 就成了异步计算的消费者它会处理计算结果。
CompletableFuture 类使用示例
接下来我们通过 20 个示例看看 CompletableFuture 类具体怎么用。
创建完整的 CompletableFuture
清单 5 示例代码 1 2 3 4 5 static void completedFutureExample() { CompletableFutureStringcf CompletableFuture.completedFuture(message); assertTrue(cf.isDone()); assertEquals(message, cf.getNow(null)); }
以上代码一般来说被用于启动异步计算getNow(null)返回计算结果或者 null。
运行简单的异步场景
清单 6 示例代码 1 2 3 4 5 6 7 8 9 static void runAsyncExample() { CompletableFutureVoidcf CompletableFuture.runAsync(() - { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); }); assertFalse(cf.isDone()); sleepEnough(); assertTrue(cf.isDone()); }
以上代码的关键点有两点
CompletableFuture 是异步执行方式使用 ForkJoinPool 实现异步执行这种方式使用了 daemon 线程执行 Runnable 任务。
同步执行动作示例
清单 7 示例代码 1 2 3 4 5 6 7 static void thenApplyExample() { CompletableFutureStringcf CompletableFuture.completedFuture(message).thenApply(s - { assertFalse(Thread.currentThread().isDaemon()); returns.toUpperCase(); }); assertEquals(MESSAGE, cf.getNow(null)); }
以上代码在异步计算正常完成的前提下将执行动作此处为转换成大写字母。
异步执行动作示例
相较前一个示例的同步方式以下代码实现了异步方式仅仅是在上面的代码里的多个方法增加Async这样的关键字。
清单 8 示例代码 1 2 3 4 5 6 7 8 9 static void thenApplyAsyncExample() { CompletableFutureStringcf CompletableFuture.completedFuture(message).thenApplyAsync(s - { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); returns.toUpperCase(); }); assertNull(cf.getNow(null)); assertEquals(MESSAGE, cf.join()); }
使用固定的线程池完成异步执行动作示例
我们可以通过使用线程池方式来管理异步动作申请以下代码基于固定的线程池也是做一个大写字母转换动作代码如清单 9 所示。
清单 9 示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 staticExecutorService executor Executors.newFixedThreadPool(3, new ThreadFactory() { int count 1; Override public Thread newThread(Runnable runnable) { return new Thread(runnable, custom-executor- count); } }); static void thenApplyAsyncWithExecutorExample() { CompletableFutureStringcf CompletableFuture.completedFuture(message).thenApplyAsync(s - { assertTrue(Thread.currentThread().getName().startsWith(custom-executor-)); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); returns.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals(MESSAGE, cf.join()); }
作为消费者消费计算结果示例
假设我们本次计算只需要前一次的计算结果而不需要返回本次计算结果那就有点类似于生产者前一次计算-消费者本次计算模式了示例代码如清单 10 所示。
清单 10 示例代码 1 2 3 4 5 6 static void thenAcceptExample() { StringBuilder result new StringBuilder(); CompletableFuture.completedFuture(thenAccept message) .thenAccept(s -result.append(s)); assertTrue(Result was empty, result.length() 0); }
消费者是同步执行的所以不需要在 CompletableFuture 里对结果进行合并。
异步消费示例
相较于前一个示例的同步方式我们也对应有异步方式代码如清单 11 所示。
清单 11 示例代码 1 2 3 4 5 6 7 static void thenAcceptAsyncExample() { StringBuilder result new StringBuilder(); CompletableFutureVoidcf CompletableFuture.completedFuture(thenAcceptAsync message) .thenAcceptAsync(s -result.append(s)); cf.join(); assertTrue(Result was empty, result.length() 0); }
计算过程中的异常示例
接下来介绍异步操作过程中的异常情况处理。下面这个示例中我们会在字符转换异步请求中刻意延迟 1 秒钟然后才会提交到 ForkJoinPool 里面去执行。
清单 12 示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 static void completeExceptionallyExample() { CompletableFutureStringcf CompletableFuture.completedFuture(message).thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFutureStringexceptionHandler cf.handle((s, th) - { return (th ! null) ? message upon cancel : ; }); cf.completeExceptionally(new RuntimeException(completed exceptionally)); assertTrue(Was not completed exceptionally, cf.isCompletedExceptionally()); try { cf.join(); fail(Should have thrown an exception); } catch(CompletionException ex) { // just for testing assertEquals(completed exceptionally, ex.getCause().getMessage()); } assertEquals(message upon cancel, exceptionHandler.join()); }
示例代码中首先我们创建一个 CompletableFuture计算完毕然后调用 thenApplyAsync 返回一个新的 CompletableFuture接着通过使用 delayedExecutor(timeout, timeUnit)方法延迟 1 秒钟执行。然后我们创建一个 handlerexceptionHandler它会处理异常返回另一个字符串message upon cancel。接下来进入 join()方法执行大写转换操作并且抛出 CompletionException 异常。
取消计算任务
与前面一个异常处理的示例类似我们可以通过调用 cancel(boolean mayInterruptIfRunning)方法取消计算任务。此外cancel()方法与 completeExceptionally(new CancellationException())等价。
清单 13 示例代码 1 2 3 4 5 6 7 8 static void cancelExample() { CompletableFuture cf CompletableFuture.completedFuture(message).thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture cf2 cf.exceptionally(throwable - canceled message); assertTrue(Was not canceled, cf.cancel(true)); assertTrue(Was not completed exceptionally, cf.isCompletedExceptionally()); assertEquals(canceled message, cf2.join()); }
一个 CompletableFuture VS 两个异步计算
我们可以创建一个 CompletableFuture 接收两个异步计算的结果下面代码首先创建了一个 String 对象接下来分别创建了两个 CompletableFuture 对象 cf1 和 cf2cf2 通过调用 applyToEither 方法实现我们的需求。
清单 14 示例代码 1 2 3 4 5 6 7 8 9 static void applyToEitherExample() { String original Message; CompletableFuture cf1 CompletableFuture.completedFuture(original) .thenApplyAsync(s - delayedUpperCase(s)); CompletableFuture cf2 cf1.applyToEither( CompletableFuture.completedFuture(original).thenApplyAsync(s - delayedLowerCase(s)), s - s from applyToEither); assertTrue(cf2.join().endsWith( from applyToEither)); }
如果我们想要使用消费者替换清单 14 的方法方式用于处理异步计算结果代码如清单 15 所示。
清单 15 示例代码 1 2 3 4 5 6 7 8 9 10 static void acceptEitherExample() { String original Message; StringBuilder result new StringBuilder(); CompletableFuture cf CompletableFuture.completedFuture(original) .thenApplyAsync(s - delayedUpperCase(s)) .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s - delayedLowerCase(s)), s - result.append(s).append(acceptEither)); cf.join(); assertTrue(Result was empty, result.toString().endsWith(acceptEither)); }
运行两个阶段后执行
下面这个示例程序两个阶段执行完毕后返回结果首先将字符转为大写然后将字符转为小写在两个计算阶段都结束之后触发 CompletableFuture。
清单 16 示例代码 1 2 3 4 5 6 7 8 static void runAfterBothExample() { String original Message; StringBuilder result new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), () - result.append(done)); assertTrue(Result was empty, result.length() 0); }
也可以通过以下方式处理异步计算结果
清单 17 示例代码 1 2 3 4 5 6 7 8 static void thenAcceptBothExample() { String original Message; StringBuilder result new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), (s1, s2) - result.append(s1 s2)); assertEquals(MESSAGEmessage, result.toString()); }
整合两个计算结果
我们可以通过 thenCombine()方法整合两个异步计算的结果注意以下代码的整个程序过程是同步的getNow()方法最终会输出整合后的结果也就是说大写字符和小写字符的串联值。
清单 18 示例代码 1 2 3 4 5 6 7 static void thenCombineExample() { String original Message; CompletableFuture cf CompletableFuture.completedFuture(original).thenApply(s - delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApply(s - delayedLowerCase(s)), (s1, s2) - s1 s2); assertEquals(MESSAGEmessage, cf.getNow(null)); }
上面这个示例是按照同步方式执行两个方法后再合成字符串以下代码采用异步方式同步执行两个方法由于异步方式情况下不能够确定哪一个方法最终执行完毕所以我们需要调用 join()方法等待后一个方法结束后再合成字符串这一点和线程的 join()方法是一致的主线程生成并起动了子线程如果子线程里要进行大量的耗时的运算主线程往往将于子线程之前结束但是如果主线程处理完其他的事务后需要用到子线程的处理结果也就是主线程需要等待子线程执行完成之后再结束这个时候就要用到 join()方法了即 join()的作用是等待该线程终止。
清单 19 示例代码 1 2 3 4 5 6 7 8 static void thenCombineAsyncExample() { String original Message; CompletableFuture cf CompletableFuture.completedFuture(original) .thenApplyAsync(s - delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s - delayedLowerCase(s)), assertEquals(MESSAGEmessage, cf.join()); (s1, s2) - s1 s2); }
除了 thenCombine()方法以外还有另外一种方法-thenCompose()这个方法也会实现两个方法执行后的返回结果的连接。
清单 20 示例代码 1 2 3 4 5 6 7 static void thenComposeExample() { String original Message; CompletableFuture cf CompletableFuture.completedFuture(original).thenApply(s - delayedUpperCase(s)) .thenCompose(upper - CompletableFuture.completedFuture(original).thenApply(s - delayedLowerCase(s)) .thenApply(s - upper s)); assertEquals(MESSAGEmessage, cf.join()); }
anyOf()方法
以下代码模拟了如何在几个计算过程中任意一个完成后创建 CompletableFuture在这个例子中我们创建了几个计算过程然后转换字符串到大写字符。由于这些 CompletableFuture 是同步执行的下面这个例子使用的是 thenApply()方法而不是 thenApplyAsync()方法使用 anyOf()方法后返回的任何一个值都会立即触发 CompletableFuture。然后我们使用 whenComplete(BiConsumer? super Object, ? super Throwable action)方法处理结果。
清单 21 示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 static void anyOfExample() { StringBuilder result new StringBuilder(); List messages Arrays.asList(a, b, c); ListCompletableFuture futures messages.stream() .map(msg - CompletableFuture.completedFuture(msg).thenApply(s - delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) - { if(th null) { assertTrue(isUpperCase((String) res)); result.append(res); } }); assertTrue(Result was empty, result.length() 0); }
当所有的 CompletableFuture 完成后创建 CompletableFuture
清单 22 所示我们会以同步方式执行多个异步计算过程在所有计算过程都完成后创建一个 CompletableFuture。
清单 22 示例代码 1 2 3 4 5 6 7 8 9 10 11 12 static void allOfExample() { StringBuilder result new StringBuilder(); List messages Arrays.asList(a, b, c); ListCompletableFuture futures messages.stream() .map(msg - CompletableFuture.completedFuture(msg).thenApply(s - delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) - { futures.forEach(cf - assertTrue(isUpperCase(cf.getNow(null)))); result.append(done); }); assertTrue(Result was empty, result.length() 0); }
相较于前一个同步示例我们也可以异步执行如清单 23 所示。
清单 23 示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 static void allOfAsyncExample() { StringBuilder result new StringBuilder(); List messages Arrays.asList(a, b, c); ListCompletableFuture futures messages.stream() .map(msg - CompletableFuture.completedFuture(msg).thenApplyAsync(s - delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture allOf CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) - { futures.forEach(cf - assertTrue(isUpperCase(cf.getNow(null)))); result.append(done); }); allOf.join(); assertTrue(Result was empty, result.length() 0); }
实际案例
以下代码完成的操作包括
首先异步地通过调用 cars()方法获取 Car 对象返回一个 CompletionStageList实例。Cars()方法可以在内部使用调用远端服务器上的 REST 服务等类似场景。然后和其他的 CompletionStageList组合通过调用 rating(manufacturerId)方法异步地返回 CompletionStage 实例。当所有的 Car 对象都被填充了 rating 后调用 allOf()方法获取最终值。调用 whenComplete()方法打印最终的评分rating。
清单 24 示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 cars().thenCompose(cars - { ListCompletionStage updatedCars cars.stream() .map(car - rating(car.manufacturerId).thenApply(r - { car.setRating(r); return car; })).collect(Collectors.toList()); CompletableFuture done CompletableFuture .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); return done.thenApply(v - updatedCars.stream().map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join).collect(Collectors.toList())); }).whenComplete((cars, th) - { if (th null) { cars.forEach(System.out::println); } else { throw new RuntimeException(th); } }).toCompletableFuture().join();
结束语
Completable 类为我们提供了丰富的异步计算调用方式我们可以通过上述基本操作描述及 20 个示例程序进一步了解如果使用 CompletableFuture 类实现我们的需求期待 JDK10 会有持续更新。
参考资源
参考 developerWorks 上的 Java 8 文章了解更多 Java 8 知识。
参考书籍 Java 8 in Action Raoul-Gabriel Urma
参考书籍 Mastering Lambdas: Java Programming in a Multicore World Maurice Naftalin
参考文章 Java 8 CompletableFutures这篇文章从基础介绍了 CompletableFuture 类的使用方式。