Java 中的异步编程工具 CompletableFuture
文章目录
!版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。
环境信息:
- JDK 版本: 1.8
示例地址:
一、CompletableFuture 概述
1.1 什么是同步和异步
在介绍 CompletableFuture 之前,先简单说一下什么是同步和异步这两个概念,介绍这俩个概念最直接的方式就是使用经典的 烧水泡茶 过程进行描述,一般来讲我们烧水泡茶时会经过 洗茶壶
->注水
->烧水
->洗茶杯
->装茶叶
->泡茶
几个步骤:
如果使用 同步编程
,那么泡茶过程可以划分为:
如果使用 异步编程
,那么泡茶过程可以划分为:
根据上面的例子,相信大家对同步和异步有了一个大概了解,同步
就是按指定顺序执行任务,这种执行过程必须按顺序进行,一步步执行,所以执行速度比较缓慢。而 异步
则不需要按照指定的顺序执行任务,它可以多个任务并行执行,等待异步任务完成后进行反馈,然后按照反馈结果执行特定操作即可,所以执行速度非常快。
1.2 CompletableFuture 简介
在 JDK 8 版本中,新增了很多功能与工具,其中 CompletableFuture 就是其中一种用于构建异步编程的工具,该工具主要是对 Future 进行优化,使开发者可以很方便的创建串行或者并行任务,以及实现任务的 OR 或者 AND 这种组合方式,极大程度上简化了异步编程中的代码量。
1.3 为什么推出 CompletableFuture
在 JDK 8 之前的版本中,编写异步任务常使用 Future 实现,使用 Future 执行异步任务并且获得异步执行结果时,我们会通过两种方式,要么调用阻塞的 get()
方法,或者轮询调用 isDone()
方法获取任务状态,判断是否为 true
。不过这两种方法在执行时都会使主线程被迫等待,对性能会产生一定影响,故而 Java 团队在 JDK 8 版本中新增了 CompletableFuture 工具,通过使用观察者模式进行设计,实现异步回调进行异步编程,一定程度上降低了编写异步任务的代码量和难度。
二、CompletableFuture 中的分类
在 CompletableFuture 中有很多方法,并且这些方法命名并不是很容易归类,比如 thenRun 和 thenRunAsync 方法,这些方法有的以 Async 结尾,而有的则不是,一般情况下我们会认为带 Async 的方法属于异步方法,不带 Async 的方法属于同步方法,不过这个看法其实是并不是完全正确,因为有的方法即使不以 Async 结尾,也是通过异步方式执行的。
其实,本人认为如果需要对 CompletableFuture 中的方法进行归类的话,可以按照使用过程进行分类,这样比较容易将不同的方法划分到不同的类别中。比如,使用过程可以划分为 任务开启、任务处理、任务结束、查看任务状态、设置任务结果 以及 任务异常处理 等几种方法,其中 任务处理 又可以分为 串行任务 和 并行任务。
注意: 下面要罗列的以
Async
结尾的方法中,执行时并不会直接从commonPool
线程池中获取线程,而是判断环境中设置的变量commonParallelism
并行度,或者应用所在服务器的 CPU 数量是否大于 1,如果满足条件则就会从ForkJoinPool.commonPool()
线程池中获取一个线程执行任务,如果不满足条件就会每次执行任务时新建一个线程,使用新创建的线程去执行任务。在下面介绍线程池时,会提到这一点。
2.1 任务开启方法
任务开启方法一般指的是 CompletableFuture 中,可以直接调用的静态方法,使用这些方法可以直接开启一个新任务,该任务相当于任务链中要执行的第一个步骤,常常我们会在该步骤中执行获取待处理的数据等操作。
任务开启方法包含如下:
方法名称 | 有返回值 | 描述 |
---|---|---|
runAsync | x | 从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。 |
supplyAsync | √ | 从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,返回指定类型的返回值。 |
2.2 任务处理方法
任务处理方法一般指的是 CompletableFuture 中,用于以串行或并行的方式处理数据的静态或实例方法,使用这些方法可以实现 OR 或 AND 任务组合,是异步编程中最重要的一些方法。
任务处理方法包含如下:
任务处理-串行任务
方法名称 | 有返回值 | 描述 |
---|---|---|
thenRun | x | 串行执行任务。并且该任务方法执行结束后,没有返回值。 |
thenRunAsync | x | 串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。 |
thenApply | √ | 串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。 |
thenApplyAsync | √ | 串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。 |
thenAccept | x | 串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。 |
thenAcceptAsync | x | 串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。 |
handle | √ | 串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。 |
handleAsync | √ | 串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。 |
whenComplete | x | 串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。 |
whenCompleteAsync | x | 串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。 |
thenCompose | √ | 串行执行任务,按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。 |
thenComposeAsync | √ | 串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。 |
任务处理-并行任务
方法名称 | 有返回值 | 描述 |
---|---|---|
thenCombine | √ | 并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。 |
thenCombineAsync | √ | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。 |
thenAcceptBoth | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
thenAcceptBothAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
runAfterBoth | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。 |
runAfterBothAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。 |
applyToEither | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
applyToEitherAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
runAfterEither | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。 |
runAfterEitherAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。 |
acceptEither | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
acceptEitherAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
allOf | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。 |
anyOf | √ | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。 |
2.3 任务结束方法
任务结束指的是调用 CompletableFuture 中的实例方法,获取执行结果或者取消任务等,结束现有的任务链。
任务结束包含的方法如下:
方法名称 | 有返回值 | 描述 |
---|---|---|
get | √ | 获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。 |
join | √ | 获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。 |
getNow | √ | 立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。 |
cancel | √ | 取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。 |
2.4 查看任务状态方法
查看任务状态方法用于查看 CompletableFuture 任务执行状态,其中包含的方法如下:
方法名称 | 有返回值 | 描述 |
---|---|---|
isDone | √ | 查看任务是否执行完成,如果当前阶段执行完成(无论是正常完成还是异常完成)则返回 true,否则返回 false。 |
isCancelled | √ | 查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。 |
isCompletedExceptionally | √ | 查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。 |
2.5 设置任务结果方法
设置任务结果方法用于设置 CompletableFuture 任务结果,使其返回指定结果,其中包含的方法如下:
方法名称 | 有返回值 | 描述 |
---|---|---|
obtrudeValue | x | 设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。 |
obtrudeException | x | 设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。 |
complete | √ | 设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。 |
completeException | √ | 设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。 |
2.6 任务异常处理方法
任务异常处理方法用于处理 CompletableFuture 执行过程中的异常,对异常进行处理,其中包含的方法如下:
方法名称 | 有返回值 | 描述 |
---|---|---|
exceptionally | x | 判断上一个任务执行时是否发生异常,如果是则就会执行 exceptionally 方法,并且将上一步异常作为当前方法的参数,然后对异常进行处理。当然,如果上一阶段执行过程中没有出现异常,则不会执行 exceptionally 方法。 |
三、CompletableFuture 中的线程池
3.1 CompletableFuture 默认的线程池
使用 CompletableFuture 执行异步任务时,会首先判断设置的 parallelism
数量,该参数一般跟当前服务器 CPU 数量相关:
- 如果 CPU 数量等于 1,CompletableFuture 执行任务时,每次都会创建一个新的线程执行任务;
- 如果 CPU 数量大于 1,CompletableFuture 执行任务时,将使用公共的 ForkJoinPool.commonPool() 线程池;
一般情况下在多核 CPU 服务器中运行应用,都会默认使用 ForkJoinPool.commonPool()
线程池,该线程池根据名称也可以大概了解到,是基于 Fork 和 Join 组合实现的,执行过程中可以将大的任务拆分为多个小任务并行执行,并且支持以窃取的方式,线程池中的线程在执行完自己工作队列中的任务后,可以窃取别的线程工作队列中没有执行完成的任务,协助其执行,尽可能使用并行的方式快速完成全部任务。所以 ForkJoinPool.commonPool()
线程池更适合执行计算密集型任务,而不太适合 IO 密集型任务。
不过公共的 ForkJoinPool.commonPool()
线程池是 JVM 进程中所有 CompletableFuture 和 Stream 共享,如果全局上下文环境中存在大量使用 ForkJoinPool.commonPool()
线程池的任务,并且这些任务中包含大量的 IO 操作,那么该线程池性能将会受到很大影响。所以,一般情况下我们使用 CompletableFuture 时,需要避免使用公共线程池,而是使用自定义的线程池,并且设置不同的任务使用不同类型的线程池,以适用不同的任务场景。
3.2 CompletableFuture 如何使用自定义线程池
在 CompletableFuture 中的绝大部分方法,都存在接收不同类型的重载方法,如:
正常的 CompletableFuture 方法:
thenRun(Runnable action)
handle(BiFunction<? super T, Throwable, ? extends U> fn)
使用公共线程池 commonPool 的异步方法:
thenRunAsync(Runnable action)
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
使用自定义线程池的 CompletableFuture 方法:
thenRunAsync(Runnable action,Executor executor)
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
大家根据上面的三种方法也能得知,在 CompletableFuture 中的方法,大部分都存在可以接收自定义线程池 Executor 参数的重载方法。因此,我们可以使用接收自定义线程池的 CompletableFuture 方法,将我们自定义的线程池作为参数,传入其中,使用该线程池中的线程执行任务。
如下,是一个使用 CompletableFuture 方法执行任务时使用自定义线程的示例:
1import java.time.LocalDate;
2import java.util.concurrent.*;
3
4public class SupplyExample {
5
6 /**
7 * 创建自定义线程池
8 *
9 * @return 自定义线程池
10 */
11 public static ThreadPoolExecutor myThreadPool() {
12 // 核心线程数
13 int coreSize = 10;
14 // 最大线程数
15 int maxSize = 20;
16 // 空闲线程的回收时间和单位
17 TimeUnit timeUnit = TimeUnit.SECONDS;
18 // 空闲线程时间销毁
19 long keepAliveTime = 60L;
20 // 工作队列
21 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(200);
22 // 拒绝策略
23 RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
24
25 // 创建并返回自定义线程池
26 return new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, timeUnit, workQueue, handler);
27 }
28
29 /**
30 * 执行 CompletableFuture.supplyAsync 方法
31 */
32 public static void supplyAsyncExample() {
33 // 自定义线程池
34 ThreadPoolExecutor threadPool = myThreadPool();
35
36 // 执行 CompletableFuture 任务,将日期字符串转换为 LocalDate 日期对象
37 CompletableFuture<LocalDate> cf = CompletableFuture
38 .supplyAsync(() -> LocalDate.parse("2022-06-01"), threadPool);
39
40 // 调用 join 方法进入堵塞状态,直至获取任务执行结果输出到控制台
41 System.out.println(cf.join());
42 }
43
44 /**
45 * main() 方法
46 */
47 public static void main(String[] args) {
48 supplyAsyncExample();
49 }
50
51}
3.3 CPU 集型任务和 IO 密集型任务
常常我们执行的任务可以根据 CPU
的使用或者 IO
的使用进行划分,可以分为:
CPU 密集型任务
CPU 密集型任务也称为计算密集型任务,指的是任务执行过程需要进行大量计算,没有堵塞,且消耗大量的 CPU 资源。比如,计算圆周率,视频解码,类型转换等。
IO 密集型任务
IO 密集型任务,指的是需要进行大量磁盘IO读取,网络IO操作等,执行过程会造成堵塞,需要创建大量的线程执行任务。比如,数据库读写,文件读写,网络请求等。
3.4 不同类型任务对线程池的选择
在 Java 中常用的线程池按执行方式划分的话,其实可以划分为 ForkJoinPool
和 ExecutorService
两种。
ForkJoinPool 线程池
关于 ForkJoinPool 线程池上面也提到过,这种线程池是基于分而治之的思想对任务进行拆解与合并,执行过程需要消耗大量 CPU 分解任务,然后进行计算,因此特别适合执行 计算密集型任务。
ExecutorService 线程池
而 ExecutorService 线程池是传统方式的线程池,使用池化方式管理线程,提前将若干个可执行的线程放入池中,当我们需要时就从池中获取线程执行任务,执行过程不会对任务进行拆解,并且使用完毕后不需要销毁线程而是放回池中,方便下次再使用,从而减少创建和销毁线程的性能开销。所以,这种线程池更适合执行过程中需要进行大量 IO 操作的任务,即适合 IO 密集型任务。
3.5 线程线程大小配置推荐
一般情况下,业务上使用的线程池都会设置线程池的线程数量大小,这个线程池线程数量大小设置为多少是一个值得考虑的问题。设置线程池过多会造成线程浪费,过少会造成处理任务的线程过少,造成任务堆积。所以,这个配置线程数量多大并不容易把控。最优的就是配置不同的线程数进行测试,然后判断应用设置线程池大小为多大性能最优。
这里可以参考网上的一套万能的推荐配置,跟操作系统的 CPU 数量相关,如下:
- CPU 密集型任务:
N + 1
- IO 密集型任务:
2N + 1
还有一种针对 IO 密集型任务设置估算公式,按公式进行配置这种性能最优,公式如下:
- 估算公式: =
(线程等待时间 + 线程 CPU 时间 / 线程 CPU 时间) * CPU 核心数
如果是既有 IO 操作的步骤,也有比较消耗 CPU 的步骤,这种混合型任务可以进行拆分,将不同的任务使用不同的线程池。
四、CompletableFuture 任务开启方法示例
4.1 runAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
runAsync | x | 从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。 |
任务要求
使用异步的方式,模拟获取远程信息,然后将获取的远程信息输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2
3public class RunAsyncExample {
4
5 public static void runAsyncExample() {
6 // 调用 runAsync 方法,异步执行 runnable 中的代码逻辑,模拟获取远程信息
7 CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("模拟获取远程信息并输出到控制台"));
8
9 // 调用 join 方法进行等待,获取执行结果
10 cf.join();
11 }
12
13 public static void main(String[] args) {
14 runAsyncExample();
15 }
16
17}
4.2 supplyAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
supplyAsync | √ | 从 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,返回指定类型的返回值。 |
任务要求
使用异步的方式,模拟获取远程信息,然后将获取的信息作为任务返回结果,之后将任务返回的结果输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2
3public class SupplyAsyncExample {
4
5 public static void supplyAsyncExample() {
6 // 调用 supplyAsync 方法,异步执行 runnable 中的代码逻辑,模拟获取远程信息,然后返回
7 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
8 System.out.println("模拟获取远程信息");
9 return "远程信息";
10 });
11
12 // 调用 join 方法进行等待,获取执行结果
13 String result = cf.join();
14 System.out.println(result);
15 }
16
17 public static void main(String[] args) {
18 supplyAsyncExample();
19 }
20
21}
五、CompletableFuture 任务处理-串行任务方法示例
5.1 thenRun 和 thenRunAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
thenRun | x | 使用主线程或者执行上一步任务的子线程,串行执行任务。并且该任务方法执行结束后,没有返回值。 |
thenRunAsync | x | 串行执行任务,从公共的 commonPool 线程池中获取一个子线程,执行指定的代码逻辑。并且该任务方法执行结束后,没有返回值。 |
任务要求
使用 CompletableFuture 工具,按照代码顺序执行任务,处理使用 "," 拼接的日期字符串,将其转换为 LocalDate 日期对象。
- 第一步,先通过
,
拆分字符串,然后存入原子引用类型对象包裹的List
集合中; - 第二步,遍历
List
集合,将拆分后的字符串转换为LocalDate
对象;
任务执行完成后,将全部日期对象输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
thenRun
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.List;
6import java.util.concurrent.CompletableFuture;
7import java.util.concurrent.atomic.AtomicReference;
8
9public class ThenRunExample {
10
11 public static void thenRunExample() {
12 // 模拟的远程获取的日期字符串
13 String dateStr = "1995-05-10,2000-03-15";
14 // 原子引用类对象
15 AtomicReference<List<String>> dateStrList = new AtomicReference<>();
16 // LocalDate集合
17 List<LocalDate> dateList = new ArrayList<>();
18
19 // 执行 CompletableFuture 任务
20 // (1) 第一步,先通过 "," 拆分字符串,然后存入原子引用对象包裹的 List 集合中;
21 // (2) 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;
22 CompletableFuture
23 .runAsync(() -> dateStrList.set(Arrays.asList(dateStr.split(","))))
24 .thenRun(() -> dateStrList.get().forEach(v -> dateList.add(LocalDate.parse(v, DateTimeFormatter.ofPattern("yyyy-MM-dd")))))
25 .join();
26
27 // 输出转换结果
28 for (LocalDate localDate : dateList) {
29 System.out.println(localDate.toString());
30 }
31 }
32
33 public static void main(String[] args) {
34 thenRunExample();
35 }
36
37}
thenRunAsync
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.List;
6import java.util.concurrent.CompletableFuture;
7import java.util.concurrent.atomic.AtomicReference;
8
9public class ThenRunAsyncExample {
10
11 public static void thenRunAsyncExample() {
12 // 模拟的远程获取的日期字符粗
13 String dateStr = "1995-05-10,2000-03-15";
14 // 原子引用类对象
15 AtomicReference<List<String>> dateStrList = new AtomicReference<>();
16 // LocalDate集合
17 List<LocalDate> dateList = new ArrayList<>();
18
19 // 执行 CompletableFuture 任务
20 // (1) 第一步,先通过 "," 拆分字符串,然后存入原子引用对象包裹的 List 集合中;
21 // (2) 第二步,遍历 List 集合,将拆分后的字符串转换为 LocalDate 对象;
22 CompletableFuture
23 .runAsync(() -> dateStrList.set(Arrays.asList(dateStr.split(","))))
24 .thenRunAsync(() -> dateStrList.get().forEach(v -> dateList.add(LocalDate.parse(v, DateTimeFormatter.ofPattern("yyyy-MM-dd")))))
25 .join();
26
27 // 输出转换结果
28 for (LocalDate localDate : dateList) {
29 System.out.println(localDate.toString());
30 }
31 }
32
33 public static void main(String[] args) {
34 thenRunAsyncExample();
35 }
36
37}
5.2 thenApply 和 thenApplyAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
thenApply | √ | 使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。 |
thenApplyAsync | √ | 串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。 |
任务要求
使用 CompletableFuture 按照顺序执行任务,获取远程日期字符串,然后对日期进行逻辑计算,并返回计算后的结果。
- 第一步,先模拟通过远程结果获取日期字符串;
- 第二步,将日期字符串转换为
LocalDate
日期对象,便于进行逻辑计算; - 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
任务执行完成后,将任务返回的日期字符串输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
thenApply
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.concurrent.CompletableFuture;
4
5public class ThenApplyExample {
6
7 public static void thenApplyExample() {
8 // 执行 CompletableFuture 串行任务
9 CompletableFuture<String> cf = CompletableFuture
10 // 第一步,先模拟通过远程结果获取日期字符串;
11 .supplyAsync(() -> "2022-06-01")
12 // 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
13 .thenApply(param -> LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd")))
14 // 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
15 .thenApply(param -> param.plusDays(10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
16
17 // 调用 join 方法进入堵塞状态,直至获取任务执行结果
18 String result = cf.join();
19 System.out.println(result);
20 }
21
22 public static void main(String[] args) {
23 thenApplyExample();
24 }
25
26}
thenApplyAsync
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.concurrent.CompletableFuture;
4
5public class ThenApplyAsyncExample {
6
7 public static void thenApplyAsyncExample() {
8 // 执行 CompletableFuture 串行任务
9 CompletableFuture<String> cf = CompletableFuture
10 // 第一步,先模拟通过远程结果获取日期字符串;
11 .supplyAsync(() -> "2022-06-01")
12 // 第二步,将日期字符串转换为 LocalDate 日期对象,便于进行逻辑计算;
13 .thenApplyAsync(param -> LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd")))
14 // 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
15 .thenApplyAsync(param -> param.plusDays(10).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
16
17 // 调用 join 方法进入堵塞状态,直至获取任务执行结果
18 String result = cf.join();
19 System.out.println(result);
20 }
21
22 public static void main(String[] args) {
23 thenApplyAsyncExample();
24 }
25
26}
5.3 thenAccept 和 thenAcceptAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
thenAccept | x | 使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且当前任务方法执行结束后,没有返回值。 |
thenAcceptAsync | x | 串行执行任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。 |
任务要求
使用 CompletableFuture 按照顺序执行任务,处理使用 "," 拼接的日期字符串,将其转换为 LocalDate 日期对象。
- 第一步,先通过
,
拆分字符串,转换为字符串数组,返回字符串数组; - 第二步,接收上一步中的日期字符串数组,遍历日期字符串数组,然后将数组中的日期字符串都转换为
LocalDate
日期对象;
任务执行完成后,将任务返回的日期字符串输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
thenAccept
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.List;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.CopyOnWriteArrayList;
6
7public class ThenAcceptExample {
8
9 public static void thenAcceptExample() {
10 // 模拟的远程获取的日期字符粗
11 String dateStr = "1995-05-10,2000-03-15";
12 // 存储 LocalDate 的集合
13 List<LocalDate> dateList = new CopyOnWriteArrayList<>();
14
15 // 执行 CompletableFuture 任务
16 CompletableFuture
17 .supplyAsync(() -> dateStr.split(","))
18 .thenAccept(dateArray -> {
19 for (String s : dateArray) {
20 dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
21 }
22 })
23 .join();
24
25 // 输出转换结果
26 for (LocalDate localDate : dateList) {
27 System.out.println(localDate.toString());
28 }
29 }
30
31 public static void main(String[] args) {
32 thenAcceptExample();
33 }
34
35}
thenAcceptAsync
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.List;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.CopyOnWriteArrayList;
6
7public class ThenAcceptAsyncExample {
8
9 public static void thenAcceptAsyncExample() {
10 // 模拟的远程获取的日期字符粗
11 String dateStr = "1995-05-10,2000-03-15";
12 // 存储 LocalDate 的集合
13 List<LocalDate> dateList = new CopyOnWriteArrayList<>();
14
15 // 执行 CompletableFuture 任务
16 CompletableFuture
17 .supplyAsync(() -> dateStr.split(","))
18 .thenAcceptAsync(dateArray -> {
19 for (String s : dateArray) {
20 dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
21 }
22 })
23 .join();
24
25 // 输出转换结果
26 for (LocalDate localDate : dateList) {
27 System.out.println(localDate.toString());
28 }
29 }
30
31 public static void main(String[] args) {
32 thenAcceptAsyncExample();
33 }
34
35}
5.4 handle 和 handleAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
handle | √ | 使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。 |
handleAsync | √ | 串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,将返回指定类型结果。 |
任务要求
使用 CompletableFuture 按照顺序执行任务,从远程接口获取日期字符串,将其转换为 LocalDate 日期对象。
- 第一步,获取远程日期字符串,然后返回;
- 第二步,接收上一步中的日期字符串,将其转换为
LocalDate
日期对象;
任务执行完成后,将任务返回的日期字符串输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
handle
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.Random;
4import java.util.concurrent.CompletableFuture;
5
6public class HandleExample {
7
8 public static void handleExample() {
9 // 执行 CompletableFuture 串行任务:
10 // 第一步,获取远程日期字符串,然后返回
11 // 第二步,接收上一步中的日期字符串,将其转换为 `LocalDate` 日期对象
12 CompletableFuture<LocalDate> cf = CompletableFuture
13 .supplyAsync(() -> {
14 int random = new Random().nextInt(2);
15 // 50% 概率发生异常
16 if (random != 0) {
17 throw new RuntimeException("模拟发生异常");
18 }
19 // 50% 概率返回正常值
20 return "2022-06-01";
21 })
22 .handle((param, exception) -> {
23 // 如果上一步结果为异常,则返回现在的日期,否则将上一步获取的日期字符串转换为日期对象
24 if (exception != null) {
25 return LocalDate.now();
26 }
27 return LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
28 });
29
30 // 调用 join 方法进入堵塞状态,直至获取任务执行结果
31 LocalDate result = cf.join();
32 System.out.println(result);
33 }
34
35 public static void main(String[] args) {
36 handleExample();
37 }
38
39}
handleAsync
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.Random;
4import java.util.concurrent.CompletableFuture;
5
6public class HandleAsyncExample {
7
8 public static void handleAsyncExample() {
9 // 执行 CompletableFuture 串行任务:
10 // 第一步,获取远程日期字符串,然后返回;
11 // 第二步,接收上一步中的日期字符串,将其转换为 LocalDate 日期对象;
12 CompletableFuture<LocalDate> cf = CompletableFuture
13 .supplyAsync(() -> {
14 int random = new Random().nextInt(2);
15 // 50% 概率发生异常
16 if (random != 0) {
17 throw new RuntimeException("模拟发生异常");
18 }
19 // 50% 概率返回正常值
20 return "2022-06-01";
21 })
22 .handleAsync((param, exception) -> {
23 // 如果上一步结果为异常,则返回现在的日期,否则将上一步获取的日期字符串转换为日期对象
24 if (exception != null) {
25 return LocalDate.now();
26 }
27 return LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
28 });
29
30 // 调用 join 方法进入堵塞状态,直至获取任务执行结果
31 LocalDate result = cf.join();
32 System.out.println(result);
33 }
34
35 public static void main(String[] args) {
36 handleAsyncExample();
37 }
38
39}
5.5 whenComplete 和 whenCompleteAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
whenComplete | x | 使用主线程或者执行上一步任务的线程,串行执行任务。将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,执行指定函数。并且当前任务执行结束后,没有返回值。 |
whenCompleteAsync | x | 串行执行任务,将上一步任务执行的【结果】和【异常】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法执行结束后,没有返回值。 除此之外: ● 如果上一阶段中正常执行结束,则该方法的结果参数不为 null; ● 如果上一阶段中抛出异常,则该方法的异常参数不为 null; |
任务要求
使用 CompletableFuture 按照顺序执行任务,从远程接口获取日期字符串,将其转换为 LocalDate 日期对象。
- 第一步,获取远程日期字符串,然后返回;
- 第二步,接收上一步中的日期字符串,将其转换为
LocalDate
日期对象,并且加入到集合;
任务执行完成后,将任务返回的日期字符串输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
whenComplete
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.concurrent.CopyOnWriteArrayList;
4import java.util.List;
5import java.util.Random;
6import java.util.concurrent.CompletableFuture;
7
8public class WhenCompleteExample {
9
10 public static void whenCompleteExample() {
11 // 模拟的远程获取的日期字符粗
12 String dateStr = "1995-05-10,2000-03-15";
13 // 存储 LocalDate 的集合
14 List<LocalDate> dateList = new CopyOnWriteArrayList<>();
15
16 // 执行 CompletableFuture 任务
17 CompletableFuture
18 .supplyAsync(() -> {
19 int random = new Random().nextInt(2);
20 if (random == 1){
21 throw new RuntimeException("模拟发生异常");
22 }
23 return dateStr.split(",");
24 })
25 .whenComplete((dateArray, exception) -> {
26 // 如果上一步执行过程没有发生异常,则将日期字符串转换为日期对象
27 if (dateArray != null){
28 for (String s : dateArray) {
29 dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
30 }
31 }
32 })
33 .join();
34
35 // 输出转换结果
36 for (LocalDate localDate : dateList) {
37 System.out.println(localDate.toString());
38 }
39 }
40
41 public static void main(String[] args) {
42 whenCompleteExample();
43 }
44
45}
whenCompleteAsync
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.CopyOnWriteArrayList;
4import java.util.List;
5import java.util.Random;
6import java.util.concurrent.CompletableFuture;
7
8public class WhenCompleteAsyncExample {
9
10 public static void whenCompleteAsyncExample(){
11 // 模拟的远程获取的日期字符粗
12 String dateStr = "1995-05-10,2000-03-15";
13 // 存储 LocalDate 的集合
14 List<LocalDate> dateList = new CopyOnWriteArrayList<>();
15
16 // 执行 CompletableFuture 任务
17 CompletableFuture
18 .supplyAsync(() -> {
19 int random = new Random().nextInt(2);
20 if (random == 1){
21 throw new RuntimeException("模拟发生异常");
22 }
23 return dateStr.split(",");
24 })
25 .whenCompleteAsync((dateArray, exception) -> {
26 // 如果上一步执行过程没有发生异常,则将日期字符串转换为日期对象
27 if (dateArray != null){
28 for (String s : dateArray) {
29 dateList.add(LocalDate.parse(s, DateTimeFormatter.ofPattern("yyyy-MM-dd")));
30 }
31 }
32 })
33 .join();
34
35 // 输出转换结果
36 for (LocalDate localDate : dateList) {
37 System.out.println(localDate.toString());
38 }
39 }
40
41 public static void main(String[] args) {
42 whenCompleteAsyncExample();
43 }
44
45}
5.6 thenCompose 和 thenComposeAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
thenCompose | √ | 使用主线程或者执行上一步任务的线程,串行执行任务。按顺序组合两个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。 |
thenComposeAsync | √ | 串行执行任务,按顺序组合俩个有依赖关系的任务,将上一步任务执行的【结果】作为当前任务方法执行时的【参数】,然后从公共的 commonPool 线程池中获取一个子线程,执行指定的函数。并且该任务方法完成后,将返回并执行一个新的任务。 |
任务要求
使用 CompletableFuture 按照顺序执行任务,获取远程日期字符串,然后对日期进行逻辑计算,并返回计算后的结果。
- 第一步,先模拟通过远程结果获取日期字符串;
- 第二步,将日期字符串转换为
LocalDate
日期对象,便于进行逻辑计算; - 第三步,操作日期对象进行计算,在当前日期的基础上+10天,然后将日期对象转换为字符串返回;
任务执行完成后,将任务返回的日期字符串输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
thenCompose
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.concurrent.CompletableFuture;
4
5public class ThenComposeExample {
6
7 public static void thenComposeExample() {
8 // 执行 CompletableFuture 任务
9 CompletableFuture<String> cf2 = CompletableFuture
10 // 模拟获取远程接口获取创建时间,然后得到结果 "2022-06-01"
11 .supplyAsync(() -> "2022-06-01")
12 // 实现将两个有依赖关系的任务组合在一起,一个用于转换字符串为日期类型,一个用于计算并转换为日期字符串
13 .thenCompose(param -> {
14 // 将时间字符串转换为 LocalDate 日期类型
15 LocalDate localDate = LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
16 // 最后创建一个新的任务,任务中将 LocalDate 日期+10天,并转换回字符串返回
17 return CompletableFuture.supplyAsync(() -> localDate
18 .plusDays(10)
19 .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
20 });
21
22 // 进入堵塞状态,等待这些阶段执行完成
23 String result = cf2.join();
24 System.out.println("计算结果:" + result);
25 }
26
27 public static void main(String[] args) {
28 thenComposeExample();
29 }
30
31}
thenComposeAsync
1import java.time.LocalDate;
2import java.time.format.DateTimeFormatter;
3import java.util.concurrent.CompletableFuture;
4
5public class ThenComposeAsyncExample {
6
7 public static void thenComposeAsyncExample() {
8 // 执行 CompletableFuture 任务
9 CompletableFuture<String> cf2 = CompletableFuture
10 // 模拟获取远程接口获取创建时间,然后得到结果 "2022-06-01"
11 .supplyAsync(() -> "2022-06-01")
12 // 实现将两个有依赖关系的任务组合在一起,一个用于转换字符串为日期类型,一个用于计算并转换为日期字符串
13 .thenComposeAsync(param -> {
14 // 将时间字符串转换为 LocalDate 日期类型
15 LocalDate localDate = LocalDate.parse(param, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
16 // 最后创建一个新的任务,任务中将 LocalDate 日期+10天,并转换回字符串返回
17 return CompletableFuture.supplyAsync(() -> localDate
18 .plusDays(10)
19 .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
20 });
21
22 // 进入堵塞状态,等待这些阶段执行完成
23 String result = cf2.join();
24 System.out.println("计算结果:" + result);
25 }
26
27 public static void main(String[] args) {
28 thenComposeAsyncExample();
29 }
30
31}
六、CompletableFuture 任务处理-并行任务方法示例
6.1 thenCombine 和 thenCombineAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
thenCombine | √ | 并行执行任务,从 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。 |
thenCombineAsync | √ | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,将返回指定类型结果。 |
不过这里需要注意,如果上一个阶段执为异常完成,则不会执行组合的下一个阶段,直接抛出异常信息。
任务要求
使用 CompletableFuture 并行执行任务,模拟获取远程端口的【姓名】和【岁数】:
- 任务一,调用远程接口,获取姓名;
- 任务二,调用远程接口,获取岁数;
等到任务一和任务二都执行完成后,将任务一和任务二中获取的【姓名】和【岁数】拼在一个字符串中,然后返回;
执行流程
任务执行流程如下图所示:
示例代码
thenCombine
1import java.util.concurrent.CompletableFuture;
2
3public class ThenCombineExample {
4
5 public static void thenCombineExample() {
6 // 模拟调用远程接口获取【姓名】
7 CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
8 // 模拟调用远程接口获取【岁数】
9 CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
10
11 // 执行 CompletableFuture 任务
12 // 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
13 // 然后将获取的【姓名】和【岁数】拼在一起,之后返回新字符串
14 CompletableFuture<String> cf = nameCf.thenCombine(ageCf, (name, age) -> "姓名:" + name + ", 岁数:" + age);
15
16 // 进入堵塞状态,等待这些阶段执行完成后获取结果
17 String info = cf.join();
18 System.out.println(info);
19 }
20
21 public static void main(String[] args) {
22 thenCombineExample();
23 }
24
25}
thenCombineAsync
1import java.util.concurrent.CompletableFuture;
2
3public class ThenCombineAsyncExample {
4
5 public static void thenCombineAsyncExample() {
6 // 模拟调用远程接口获取【姓名】
7 CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
8 // 模拟调用远程接口获取【岁数】
9 CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
10
11 // 执行 CompletableFuture 任务
12 // 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
13 // 然后将获取的【姓名】和【岁数】拼在一起,之后返回新字符串
14 CompletableFuture<String> cf = nameCf.thenCombineAsync(ageCf, (name, age) -> "姓名:" + name + ", 岁数:" + age);
15
16 // 进入堵塞状态,等待这些阶段执行完成后获取结果
17 String info = cf.join();
18 System.out.println(info);
19 }
20
21 public static void main(String[] args) {
22 thenCombineAsyncExample();
23 }
24
25}
6.2 thenAcceptBoth 和 thenAcceptBothAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
thenAcceptBoth | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
thenAcceptBothAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
任务要求
使用 CompletableFuture 并行执行任务,模拟获取远程端口的【姓名】和【岁数】:
- 任务一,调用远程接口,获取姓名;
- 任务二,调用远程接口,获取岁数;
等到任务一和任务二都执行完成后,将任务一和任务二中获取的【姓名】和【岁数】拼在一个字符串中,然后输出到控制台;
执行流程
任务执行流程如下图所示:
示例代码
thenAcceptBoth
1import java.util.concurrent.CompletableFuture;
2
3public class ThenAcceptBothExample {
4
5 public static void thenAcceptBothExample(){
6 // 模拟调用远程接口获取【姓名】
7 CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
8 // 模拟调用远程接口获取【岁数】
9 CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
10
11 // 执行 CompletableFuture 任务
12 // 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
13 // 然后将获取的【姓名】和【岁数】输出
14 CompletableFuture<Void> cf = nameCf.thenAcceptBoth(ageCf,
15 (name, age) -> System.out.println("姓名:" + name + ", 岁数:" + age));
16
17 // 进入堵塞状态等待各阶段执行完成
18 cf.join();
19 }
20
21 public static void main(String[] args) {
22 thenAcceptBothExample();
23 }
24
25}
thenAcceptBothAsync
1import java.util.concurrent.CompletableFuture;
2
3public class ThenAcceptBothAsyncExample {
4
5 public static void thenAcceptBothAsyncExample() {
6 // 模拟调用远程接口获取【姓名】
7 CompletableFuture<String> nameCf = CompletableFuture.supplyAsync(() -> "mydlq");
8 // 模拟调用远程接口获取【岁数】
9 CompletableFuture<Integer> ageCf = CompletableFuture.supplyAsync(() -> 18);
10
11 // 执行 CompletableFuture 任务
12 // 并行执行,模拟调用远程接口获取【姓名】,以及获取【岁数】
13 // 然后将获取的【姓名】和【岁数】输出
14 CompletableFuture<Void> cf = nameCf.thenAcceptBothAsync(ageCf,
15 (name, age) -> System.out.println("姓名:" + name + ", 岁数:" + age));
16
17 // 进入堵塞状态等待各阶段执行完成
18 cf.join();
19 }
20
21 public static void main(String[] args) {
22 thenAcceptBothAsyncExample();
23 }
24
25}
6.3 runAfterBoth 和 runAfterBothAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
thenAcceptBoth | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务方法执行结束后,没有返回值。 |
thenAcceptBothAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,等到两个任务都执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。 |
任务要求
使用 CompletableFuture 并行执行任务,执行步骤一和步骤二:
- 任务一,执行步骤一;
- 任务二,执行步骤二;
等到步骤一和步骤二都执行完成后,就执行最终步骤。
执行流程
任务执行流程如下图所示:
示例代码
runAfterBoth
1import java.util.concurrent.CompletableFuture;
2
3public class RunAfterBothExample {
4
5 public static void runAfterBothExample() {
6 // CompletableFuture 任务 - 步骤1
7 CompletableFuture<Void> step1 = CompletableFuture.runAsync(() -> System.out.println("阶段1"));
8 // CompletableFuture 任务 - 步骤2
9 CompletableFuture<Void> step2 = CompletableFuture.runAsync(() -> System.out.println("阶段2"));
10
11 // 当【阶段1】和【阶段2】并行执行完成后,则执行特定任务
12 step1.runAfterBoth(step2, ()-> System.out.println("全部阶段完成,执行特定任务"));
13 }
14
15 public static void main(String[] args) {
16 runAfterBothExample();
17 }
18
19}
runAfterBothAsync
1import java.util.concurrent.CompletableFuture;
2
3public class RunAfterBothAsyncExample {
4
5 public static void runAfterBothAsyncExample() {
6 // CompletableFuture 任务 - 步骤1
7 CompletableFuture<Void> step1 = CompletableFuture.runAsync(() -> System.out.println("阶段1"));
8 // CompletableFuture 任务 - 步骤2
9 CompletableFuture<Void> step2 = CompletableFuture.runAsync(() -> System.out.println("阶段2"));
10
11 // 当【阶段1】和【阶段2】并行执行完成后,则执行特定任务
12 step1.runAfterBothAsync(step2, ()-> System.out.println("全部阶段完成,执行特定任务"));
13 }
14
15 public static void main(String[] args) {
16 runAfterBothAsyncExample();
17 }
18
19}
6.4 applyToEither 和 applyToEitherAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
applyToEither | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
applyToEitherAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前先执行结束的任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
任务要求
使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:
- 任务一,从通道1中获取信息;
- 任务二,从通道2中获取信息;
- 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就处理先执行完成任务中返回的结果;
几个任务执行完成后,将任务返回结果输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
applyToEither
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3
4public class ApplyToEitherExample {
5
6 public static void applyToEitherExample(){
7 // 通道1 (模拟获取远程信息)
8 CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
9 randomSleep(1000);
10 return "阶段1-从通道1成功获取结果";
11 });
12 // 通道2 (模拟获取远程信息)
13 CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
14 randomSleep(1000);
15 return "阶段2-从通道2成功获取结果";
16 });
17
18 // 执行 CompletableFuture 任务
19 // 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
20 // 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
21 // 务要执行的参数,然后执行下一步操作,执行完成后返回指定值。
22 CompletableFuture<String> channelTask = channel1.applyToEither(channel2, response -> "获取的结果: " + response);
23
24 // 进入堵塞状态,等待这些阶段执行完成后获取结果并输出
25 String result = channelTask.join();
26 System.out.println(result);
27 }
28
29 /**
30 * 指定规定时间内睡眠
31 * @param time 随机睡眠时间
32 */
33 private static void randomSleep(int time) {
34 try {
35 Thread.sleep(new Random().nextInt(time));
36 } catch (InterruptedException e) {
37 }
38 }
39
40 public static void main(String[] args) {
41 applyToEitherExample();
42 }
43
44}
applyToEitherAsync
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3
4public class ApplyToEitherAsyncExample {
5
6 public static void applyToEitherAsyncExample1(){
7 // 通道1 (模拟获取远程信息)
8 CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
9 randomSleep(1000);
10 return "阶段1-从通道1成功获取结果";
11 });
12 // 通道2 (模拟获取远程信息)
13 CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
14 randomSleep(1000);
15 return "阶段2-从通道2成功获取结果";
16 });
17
18 // 执行 CompletableFuture 任务
19 // 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
20 // 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
21 // 务要执行的参数,然后执行下一步操作,执行完成后返回指定值。
22 CompletableFuture<String> channelTask = channel1.applyToEitherAsync(channel2, response -> "获取的结果: " + response);
23
24 // 进入堵塞状态,等待这些阶段执行完成后获取结果并输出
25 String result = channelTask.join();
26 System.out.println(result);
27 }
28
29 /**
30 * 指定规定时间内睡眠
31 * @param time 随机睡眠时间
32 */
33 private static void randomSleep(int time){
34 try {
35 int randomNumber = new Random().nextInt(time);
36 Thread.sleep(randomNumber);
37 } catch (InterruptedException e) {
38 }
39 }
40
41 public static void main(String[] args) {
42 applyToEitherAsyncExample1();
43 }
44
45}
6.5 runAfterEither 和 runAfterEitherAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
runAfterEither | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。 |
runAfterEitherAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从 commonPool 线程池中获取一个子线程,执行一个新的任务方法,该方法执行结束后将返回并执行一个新任务。新任务执行结束后,没有返回值。 |
任务要求
使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:
- 任务一,执行步骤一;
- 任务二,执行步骤二;
- 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就执行特定动作。
执行流程
任务执行流程如下图所示:
示例代码
runAfterEither
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3
4public class RunAfterEitherExample {
5
6 public static void runAfterEitherExample(){
7 // 通道1 (模拟向远程通道1写入数据)
8 CompletableFuture<Void> channel1 = CompletableFuture.runAsync(() -> {
9 randomSleep(1000);
10 System.out.println("阶段1-执行完成");
11 });
12 // 通道2 (模拟向远程通道2写入数据)
13 CompletableFuture<Void> channel2 = CompletableFuture.runAsync(() -> {
14 randomSleep(1000);
15 System.out.println("阶段2-执行完成");
16 });
17
18 // 执行 CompletableFuture 任务
19 // 并行执行【任务1】和【任务2】,无论哪个任务先执行完成,直接进行下一步任务,执行特定动作
20 channel1.runAfterEither(channel2, () -> System.out.println("任意任务执行完成-执行特定操作")).join();
21 }
22
23 /**
24 * 指定规定时间内睡眠
25 *
26 * @param time 随机睡眠时间
27 */
28 private static void randomSleep(int time) {
29 try {
30 Thread.sleep(new Random().nextInt(time));
31 } catch (InterruptedException e) {
32 }
33 }
34
35 public static void main(String[] args) {
36 runAfterEitherExample();
37 }
38
39}
runAfterEitherAsync
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3
4public class RunAfterEitherAsyncExample {
5
6 public static void runAfterEitherAsyncExample(){
7 // 通道1 (模拟向远程通道1写入数据)
8 CompletableFuture<Void> channel1 = CompletableFuture.runAsync(() -> {
9 randomSleep(1000);
10 System.out.println("阶段1-执行完成");
11 });
12 // 通道2 (模拟向远程通道2写入数据)
13 CompletableFuture<Void> channel2 = CompletableFuture.runAsync(() -> {
14 randomSleep(1000);
15 System.out.println("阶段2-执行完成");
16 });
17
18 // 执行 CompletableFuture 任务
19 // 并行执行【任务1】和【任务2】,无论哪个任务先执行完成,直接进行下一步任务,执行特定动作
20 channel1.runAfterEitherAsync(channel2, () -> System.out.println("任意任务执行完成-执行特定操作")).join();
21 }
22
23
24 /**
25 * 指定规定时间内睡眠
26 *
27 * @param time 随机睡眠时间
28 */
29 private static void randomSleep(int time) {
30 try {
31 Thread.sleep(new Random().nextInt(time));
32 } catch (InterruptedException e) {
33 }
34 }
35
36 public static void main(String[] args) {
37 runAfterEitherAsyncExample();
38 }
39
40}
6.6 acceptEither 和 acceptEitherAsync
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
AcceptEither | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
AcceptEitherAsync | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行两个任务,两个任务任意一个执行结束后,继续从公共的 commonPool 线程池中获取一个子线程,执行一个新的任务方法,将之前两个任务的执行【结果】作为新任务方法的【参数】,然后返回并执行新任务。新任务执行结束后,没有返回值。 |
任务要求
使用 CompletableFuture 并行执行任务,执行任务一、任务二和任务三:
- 任务一,从通道1中获取信息;
- 任务二,从通道2中获取信息;
- 任务三,等到步骤一和步骤二两个任务任意一个先执行完成,就将先执行完成任务中返回的结果输出到控制台;
执行流程
任务执行流程如下图所示:
示例代码
AcceptEither
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3
4public class AcceptEitherExample {
5
6 public static void acceptEitherExample() {
7 // 通道1 (模拟获取远程信息)
8 CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
9 randomSleep(1000);
10 return "阶段1-成功获取结果";
11 });
12 // 通道2 (模拟获取远程信息)
13 CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
14 randomSleep(1000);
15 return "阶段2-成功获取结果";
16 });
17
18 // 执行 CompletableFuture 任务
19 // 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
20 // 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
21 // 务要执行的参数,然后执行下一步操作,执行完成后没有返回值。
22 channel1.acceptEither(channel2, result -> System.out.println("获取的结果: " + result)).join();
23 }
24
25 /**
26 * 指定规定时间内睡眠
27 *
28 * @param time 随机睡眠时间
29 */
30 private static void randomSleep(int time) {
31 try {
32 Thread.sleep(new Random().nextInt(time));
33 } catch (InterruptedException e) {
34 }
35 }
36
37 public static void main(String[] args) {
38 acceptEitherExample();
39 }
40
41}
AcceptEitherAsync
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3
4public class AcceptEitherAsyncExample {
5
6 public static void acceptEitherAsyncExample(){
7 // 通道1 (模拟获取远程信息)
8 CompletableFuture<String> channel1 = CompletableFuture.supplyAsync(() -> {
9 randomSleep(1000);
10 return "阶段1-成功获取结果";
11 });
12 // 通道2 (模拟获取远程信息)
13 CompletableFuture<String> channel2 = CompletableFuture.supplyAsync(() -> {
14 randomSleep(1000);
15 return "阶段2-成功获取结果";
16 });
17
18 // 执行 CompletableFuture 任务
19 // 并行执行【任务1】和【任务2】,分别从【通道1】或者【通道2】中获取结果,
20 // 哪个通道先获取结果成功,就使用该结果作为任务的返回结果,也作为下一步任
21 // 务要执行的参数,然后执行下一步操作,执行完成后没有返回值。
22 channel1.acceptEitherAsync(channel2, result -> System.out.println("获取的结果: " + result)).join();
23 }
24
25 /**
26 * 指定规定时间内睡眠
27 *
28 * @param time 随机睡眠时间
29 */
30 private static void randomSleep(int time){
31 try {
32 Thread.sleep(new Random().nextInt(time));
33 } catch (InterruptedException e) {
34 }
35 }
36
37 public static void main(String[] args) {
38 acceptEitherAsyncExample();
39 }
40
41}
6.7 allOf
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
allOf | x | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待全部任务方法都执行完成后结束。任务执行结束后,没有返回值。 |
不过这里需要注意:
allOf
方法执行时,如果传入的CompletableFuture<?>
中的其中一个阶段异常完成时,那么返回的CompletableFuture<Void>
也异常完成,并将此异常作为异常原因。
任务要求
并行执行 CompletableFuture 任务,模拟从三个接口获取用户相关信息:
- 任务一,调用接口1获取用户基本信息;
- 任务二,调用接口2获取用户头像;
- 任务三,调用接口3获取用用户余额;
等待三个任务并行执行完成,然后将获取的信息聚合在 Map
集合中。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.Map;
2import java.util.concurrent.CompletableFuture;
3import java.util.concurrent.ConcurrentHashMap;
4
5public class AllOfExample {
6
7 public static void allOfExample() {
8 // 创建聚合数据的 Map 集合
9 Map<String, String> userMap = new ConcurrentHashMap<>(3);
10
11 // 创建待执行的 Runnable 参数
12 Runnable runnable1 = () -> {
13 System.out.println("任务1-成功获取用户基本信息");
14 userMap.put("userInfo", "{name: mydlq, age: 18}");
15 };
16 Runnable runnable2 = () -> {
17 System.out.println("任务2-成功获取用户头像");
18 userMap.put("avatar", "http://www.xxx.com/avatar");
19 };
20 Runnable runnable3 = () -> {
21 System.out.println("任务3-成功获取用户余额");
22 userMap.put("balance", "1000");
23 };
24
25 // 执行多个 CompletableFuture,需要等待全部完成
26 CompletableFuture<Void> cf = CompletableFuture.allOf(
27 CompletableFuture.runAsync(runnable1),
28 CompletableFuture.runAsync(runnable2),
29 CompletableFuture.runAsync(runnable3)
30 );
31
32 // 进入堵塞状态,等待执行完成
33 cf.join();
34
35 // 输出用户信息
36 System.out.println("获取的用户信息:");
37 for (Map.Entry<String, String> entry : userMap.entrySet()) {
38 System.out.println(entry.getKey() + ": " + entry.getValue());
39 }
40 }
41
42 public static void main(String[] args) {
43 allOfExample();
44 }
45
46}
6.8 anyOf
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
anyOf | √ | 并行执行任务,从公共的 commonPool 线程池中获取线程,并行执行多个任务方法,等待多个任务方法中任意一个执行完成后结束。任务执行结束后,返回第一个先执行完成任务的返回值。 |
不过这里需要注意:
anyOf
方法执行时,如果传入的全部CompletableFuture<?>
阶段都没有完成前,任意一个阶段执行过程出现异常并没有处理,也就是说该阶段执行过程异常完成,那么返回的CompletableFuture<Object>
也异常完成,并将此异常作为异常原因。
任务要求
并行执行 CompletableFuture 任务,模拟从三个接口中获取相同的信息,只要有任意一个接口先行返回信息,就直接执行下一步骤,而无需等待。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3import java.util.function.Supplier;
4
5public class AnyOfExample {
6
7 public static void anyOfExample() {
8 Supplier<String> supplier1 = () -> {
9 System.out.println("通道1");
10 return "通道1-成功获取信息";
11 };
12 Supplier<String> supplier2 = () -> {
13 System.out.println("通道2");
14 return "通道2-成功获取信息";
15 };
16 Supplier<String> supplier3 = () -> {
17 System.out.println("通道3");
18 return "通道3-成功获取信息";
19 };
20
21 // 执行多个 CompletableFuture,只要任意一个执行完成就执行下一步
22 CompletableFuture<Object> cf = CompletableFuture.anyOf(
23 CompletableFuture.supplyAsync(supplier1),
24 CompletableFuture.supplyAsync(supplier2),
25 CompletableFuture.supplyAsync(supplier3)
26 );
27
28 // 进入堵塞状态,等待执行完成,输出获取的信息
29 Object result = cf.join();
30 System.out.println(result);
31 }
32
33 /**
34 * 随机睡眠指定时间
35 *
36 * @param time 睡眠时间
37 */
38 public static void randomTimeSleep(int time){
39 try {
40 Thread.sleep(new Random().nextInt(time));
41 } catch (InterruptedException e) {
42 throw new RuntimeException(e);
43 }
44 }
45
46 public static void main(String[] args) {
47 anyOfExample();
48 }
49
50}
七、任务结束方法
7.1 get
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
get | √ | 获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(运行时)异常。 |
任务要求
执行 CompletableFuture 任务,然后调用 get 方法进行等待,获取执行结果。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2import java.util.concurrent.ExecutionException;
3
4public class GetExample {
5
6 public static void getExample() throws ExecutionException, InterruptedException {
7 // 执行 CompletableFuture 任务
8 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "执行结果");
9
10 // 调用 get 方法进行等待,获取执行结果
11 cf.get();
12 }
13
14 public static void main(String[] args) throws ExecutionException, InterruptedException {
15 getExample();
16 }
17
18}
7.2 join
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
join | √ | 获取任务执行结果,如果任务尚未完成则进行堵塞状态,如果任务正常完成则返回执行结果,如果异常完成或执行过程中引发异常,这时就会抛出(未经检查)异常。 |
任务要求
执行 CompletableFuture 任务,然后调用 join 方法进行等待,获取执行结果。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2
3public class JoinExample {
4
5 public static void joinExample() {
6 // 执行 CompletableFuture 任务
7 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "执行结果");
8
9 // 调用 join 方法进行等待,获取执行结果
10 cf.join();
11 }
12
13 public static void main(String[] args) {
14 joinExample();
15 }
16
17}
7.3 getNow
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
getNow | √ | 立即获取任务执行结果,如果任务没有完成则返回设定的默认值,如果任务正常完成则返回执行结果。 |
任务要求
执行 CompletableFuture 任务,然后调用 getNow 方法获取任务执行结果,如果任务没有执行完成则返回默认值。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3
4public class GetNowExample {
5
6 public static void getNowExample() {
7 // 执行 CompletableFuture 任务
8 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
9 // 睡眠5毫秒
10 sleep(5);
11 return "示例-执行完成";
12 });
13
14 // 随机睡眠1~10毫秒
15 sleep(new Random().nextInt(10));
16
17 // 调用 getNow 方法获取执行结果,如果任务未执行完成则输出设置的默认值
18 String result = cf.getNow("默认值");
19 System.out.println(result);
20 }
21
22 /**
23 * 线程睡眠
24 *
25 * @param millis 睡眠时间(单位:毫秒)
26 */
27 public static void sleep(long millis){
28 try {
29 Thread.sleep(millis);
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 }
33 }
34
35 public static void main(String[] args) {
36 getNowExample();
37 }
38
39}
7.4 cancel
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
cancel | √ | 取消任务,如果任务尚未执行结束,调用该方法成功取消任务时返回 true,否则返回 false。并且任务取消成功后,通过 get/join 方法获取结果时,会抛出 CancellationException 异常。 |
任务要求
执行 CompletableFuture 任务,然后调用 cancel 方法取消任务。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.Random;
2import java.util.concurrent.CancellationException;
3import java.util.concurrent.CompletableFuture;
4import java.util.concurrent.ExecutionException;
5
6public class CancelExample {
7
8 public static void cancelExample() throws ExecutionException, InterruptedException {
9 // 执行 CompletableFuture 任务
10 CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
11 // 随机睡眠1~10毫秒
12 sleep(new Random().nextInt(10));
13 System.out.println("示例-执行任务完成");‘
14 });
15
16 // 随机睡眠1~10毫秒
17 sleep(new Random().nextInt(10));
18
19 // 调用 cancel 方法取消任务
20 boolean cancelResult = cf.cancel(true);
21 System.out.println("取消任务: " + cancelResult);
22
23 // 调用 get 方法获取执行结果,如果取消任务将抛出 CancellationException 异常,这里对该异常进行处理
24 try {
25 cf.get();
26 } catch (CancellationException e) {
27 System.out.println("获取任务失败,任务已经被取消");
28 }
29 }
30
31 /**
32 * 线程睡眠
33 *
34 * @param millis 睡眠时间(单位:毫秒)
35 */
36 public static void sleep(long millis){
37 try {
38 Thread.sleep(millis);
39 } catch (InterruptedException e) {
40 e.printStackTrace();
41 }
42 }
43
44 public static void main(String[] args) throws ExecutionException, InterruptedException {
45 cancelExample();
46 }
47
48}
八、CompletableFuture 查看任务状态方法示例
8.1 isDone
方法描述
查看任务是否执行完成,如果当前阶段执行完成 (无论是正常完成还是异常完成) 则返回 true
,否则返回 false
。
方法 | 有返回值 | 描述 |
---|---|---|
isDone | √ | 查看任务是否执行完成,如果当前阶段执行完成 (无论是正常完成还是异常完成) 则返回 true,否则返回 false。 |
任务要求
执行 CompletableFuture 任务,然后调用 isDone 方法查看任务是否执行完成。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2
3public class IsDoneExample {
4
5 public static void isDoneExample() throws InterruptedException {
6 // 执行 CompletableFuture 任务
7 CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("任务执行中..."));
8
9 // 调用 isDone 方法查看任务是否执行完成
10 System.out.println("任务是否完成: " + cf.isDone());
11
12 // 等待1秒时间
13 Thread.sleep(1000L);
14
15 // 调用 isDone 方法再次查看任务是否执行完成
16 System.out.println("任务是否完成: " + cf.isDone());
17 }
18
19 public static void main(String[] args) throws InterruptedException {
20 isDoneExample();
21 }
22
23}
8.2 isCancelled
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
isCancelled | √ | 查看当前阶段任务是否成功取消,如果此阶段任务在完成之前被取消则返回 true,否则返回 false。 |
任务要求
执行 CompletableFuture 任务,然后调用 cancel 方法取消任务,之后调用 isCancelled 方法观察任务是否成功取消。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2
3public class IsCancelledExample {
4
5 public static void isCancelledExample(){
6 // 执行 CompletableFuture 任务
7 CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> System.out.println("执行 CompletableFuture 任务"));
8
9 // 调用 cancel 方法取消任务
10 cf.cancel(true);
11
12 // 调用 isCancelled 方法,查询任务是否成功被取消
13 System.out.println("是否取消任务: " + cf.isCancelled());
14 }
15
16 public static void main(String[] args) {
17 isCancelledExample();
18 }
19
20}
8.3 isCompletedExceptionally
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
isCompletedExceptionally | √ | 查看当前阶段任务是否以异常的方式执行完成。比如取消任务、突然终止任务或者执行过程出现异常等,都属于异常方式执行完成,如果是以异常方式完成则返回 true,否则返回 false。 |
任务要求
执行 CompletableFuture 任务,然后调用 isCompletedExceptionally 方法查看任务是否异常执行完成。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2
3public class IsCompletedExceptionallyExample {
4
5 public static void isCompletedExceptionallyExample() throws InterruptedException {
6 // 执行 CompletableFuture 任务
7 CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
8 System.out.println("执行中...");
9 // 模拟发生异常
10 System.out.println(1/0);
11 });
12
13 // 等待1秒确保任务执行完成
14 Thread.sleep(1000L);
15
16 // 调用 isCompletedExceptionally 方法验证当前阶段是否异常完成
17 System.out.println("是否异常完成: " + cf.isCompletedExceptionally());
18 }
19
20 public static void main(String[] args) throws InterruptedException {
21 isCompletedExceptionallyExample();
22 }
23
24}
九、CompletableFuture 设置任务结果方法示例
9.1 obtrudeValue
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
obtrudeValue | x | 设置(重置)调用 get/join 方法时返回指定值,无论任务是否执行完成。 |
任务要求
执行 CompletableFuture 任务,然后调用 obtrudeValue 方法强制设置任务执行结果为指定值,无论当前任务是否执行成功/失败。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2import java.util.concurrent.ExecutionException;
3
4public class ObtrudeValueExample {
5
6 public static void obtrudeValueExample() throws ExecutionException, InterruptedException {
7 // 执行 CompletableFuture 任务
8 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
9
10 // 设置或重置 get 方法和与其相关方法的返回的值
11 cf.obtrudeValue("示例-强制设置返回值-无论成功失败");
12
13 // 调用 get 方法进行等待,获取执行结果并输出到控制台
14 String result = cf.get();
15 System.out.println(result);
16 }
17
18 public static void main(String[] args) throws ExecutionException, InterruptedException {
19 obtrudeValueExample();
20 }
21
22}
9.2 obtrudeException
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
obtrudeException | x | 设置(重置)调用 get/join 方法时返回指定异常,无论任务是否执行完成。 |
任务要求
执行 CompletableFuture 任务,然后调用 obtrudeException 方法强制设置任务执行结果为指定异常,无论当前任务是否执行成功/失败。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2import java.util.concurrent.ExecutionException;
3
4public class ObtrudeExceptionExample {
5
6 public static void obtrudeExceptionExample() throws ExecutionException, InterruptedException {
7 // 执行 CompletableFuture 任务
8 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
9
10 // 设置 get 方法和与其相关的方法后续执行抛出指定异常
11 cf.obtrudeException(new Exception("未知异常"));
12
13 // 调用 get 方法进行等待,获取执行结果并输出
14 String result = cf.get();
15 System.out.println(result);
16 }
17
18 public static void main(String[] args) throws ExecutionException, InterruptedException {
19 obtrudeExceptionExample();
20 }
21
22}
9.3 complete
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
complete | √ | 设置调用 get/join 方法时返回指定值。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。 |
任务要求
执行 CompletableFuture 任务,然后调用 complete 方法设置或重置任务执行结果,然后将最后任务执行结果输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2import java.util.concurrent.ExecutionException;
3
4public class CompleteExample {
5
6 public static void completeExample() throws ExecutionException, InterruptedException {
7 // 执行 CompletableFuture 任务
8 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
9
10 // 设置或重置 get 方法和与其相关方法的返回的值,任务没有执行完成返回 true,否则返回 false
11 boolean setResult = cf.complete("示例-任务没有完成-设置返回值");
12 System.out.println("设置返回值为执行结果: " + setResult);
13
14 // 调用 get 方法进行等待,获取执行结果并输出
15 String result = cf.get();
16 System.out.println(result);
17 }
18
19 public static void main(String[] args) throws ExecutionException, InterruptedException {
20 completeExample();
21 }
22
23}
9.4 CompleteException
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
completeException | √ | 设置调用 get/join 方法时返回指定异常。不过需要注意的是,如果任务没有执行完成,则可以通过该方法设置返回值,并且返回 true。如果任务已经完成,则无法配置,并且返回 false。 |
任务要求
执行 CompletableFuture 任务,然后调用 complete 方法设置或重置任务执行结果为指定异常,然后将最后任务执行结果或异常输出到控制台。
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.concurrent.CompletableFuture;
2import java.util.concurrent.ExecutionException;
3
4public class CompleteExceptionallyExample {
5
6 public static void completeExceptionallyExample() throws ExecutionException, InterruptedException {
7 // 执行 CompletableFuture 任务
8 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "示例-执行完成");
9
10 // 设置或重置 get 方法和与其相关方法的返回的值,任务没有执行完成返回 true,否则返回 false
11 boolean setResult = cf.completeExceptionally(new Exception("未知异常"));
12 System.out.println("设置返回值为执行结果: " + setResult);
13
14 // 调用 get 方法进行等待,获取执行结果并输出
15 String result = cf.get();
16 System.out.println(result);
17 }
18
19 public static void main(String[] args) throws ExecutionException, InterruptedException {
20 completeExceptionallyExample();
21 }
22
23}
十、任务异常处理方法
10.1 exceptionally
方法描述
方法 | 有返回值 | 描述 |
---|---|---|
exceptionally | x | 判断上一个任务执行时是否发生异常,如果是则就会执行 exceptionally 方法,并且将上一步异常作为当前方法的参数,然后对异常进行处理。当然,如果上一阶段执行过程中没有出现异常,则不会执行 exceptionally 方法。 |
任务要求
执行 CompletableFuture 任务,并且使用 exceptionally 方法:
- 如果
exceptionally
方法的上一阶段执行过程中出现异常,则会执行exceptionally
阶段; - 如果
exceptionally
方法的上一阶段执行过程中没有出现异常,则不会执行exceptionally
阶段;
执行流程
任务执行流程如下图所示:
示例代码
1import java.util.Random;
2import java.util.concurrent.CompletableFuture;
3import java.util.concurrent.ExecutionException;
4
5public class ExceptionallyExample {
6
7 public static void exceptionallyExample() throws ExecutionException, InterruptedException {
8 // 执行 CompletableFuture 串行任务,并且使用 exceptionally 方法:
9 // - 如果 exceptionally 方法的上一阶段执行过程中出现异常,则会执行 exceptionally 阶段;
10 // - 如果 exceptionally 方法的上一阶段执行过程中没有出现异常,则不会执行 exceptionally 阶段;
11 CompletableFuture<String> cf = CompletableFuture
12 // 执行任务,50%概率发生异常,50%概率返回正常值
13 .supplyAsync(() -> {
14 if (new Random().nextInt(2) != 0) {
15 throw new RuntimeException("模拟发生异常");
16 }
17 return "正常结束";
18 })
19 // 处理上一步中抛出的异常
20 .exceptionally(Throwable::getMessage);
21
22 // 调用 get 方法进行等待,获取执行结果
23 String result = cf.get();
24 System.out.println(result);
25 }
26
27 public static void main(String[] args) throws ExecutionException, InterruptedException {
28 exceptionallyExample();
29 }
30
31}
---END---
如果本文对你有帮助,可以关注我的公众号"小豆丁技术栈"了解最新动态,顺便也请帮忙 github 点颗星哦~感谢~
!版权声明:本博客内容均为原创,每篇博文作为知识积累,写博不易,转载请注明出处。