CompletableFuture使用大全
CompletableFuture使用大全
1. CompletableFuture基本概述
- 用途:用于异步调用,内部封装线程池,简化多线程和高并发处理。
- 特点:封装了Future,具有回调功能,可在任务完成后继续进行下一个动作。
Runnable就是没有返回结果的行为。
Callable是有返回结果的行为。
Future 异步封装Callable和Runnable,委托给线程池执行后,需要取回执行的结果
CompletableFuture 封装了Future,使其拥有了回调的功能,在某个行为完成之后,可以继续进行下一个动作。
2. API方法分类与记忆规律
- 创建类:
completeFuture
:创建默认返回值的CompletableFuture。runAsync
:异步执行无返回值。supplyAsync
:异步执行有返回值。anyOf
:任意一个执行完成即可进行下一步。allOf
:所有任务完成才进行下一步。
- 状态取值类:
join
:合并结果,等待。get
:合并等待结果,可增加超时时间。getNow
:如果结果计算完成或异常则返回,否则返回指定值。isCancelled
、isCompletedExceptionally
、isDone
:状态检查方法。
- 控制类:用于主动控制CompletableFuture的完成行为。
complete
、completeExceptionally
、cancel
:控制完成、异常和取消。
- 接续类:注入回调行为,是CompletableFuture的核心特性。
-
thenApply
、thenApplyAsync
-
thenAccept
、thenAcceptAsync
-
thenRun
、thenRunAsync
-
thenCombine
、thenCombineAsync
-
thenAcceptBoth
、thenAcceptBothAsync
-
runAfterBoth
、runAfterBothAsync
-
applyToEither
、applyToEitherAsync
-
acceptEither
、acceptEitherAsync
-
runAfterEither
、runAfterEitherAsync
-
thenCompose
、thenComposeAsync
-
whenComplete
、whenCompleteAsync
-
handle
、handleAsync
-
exceptionally
1.入口函数
supplyAsync()
代表一个异步的有返回值的函数,之所以异步,是与主线程区别,从线程池中的拿一个线程来执行。2.
thenApply()
和thenAccept()
没有Async
,意味着是和前面的任务共用一个线程,从执行结果上我们也可以看到线程名称相同。3.
thenApply()
需要接收上一个任务的返回值,并且自己也要有返回值。4.
thenAccept()
需要接收上一个任务的返回值,但是它不需要返回值。5.
thenApplyAsync()
和thenRunAsync()
分别表示里面的任务都是异步执行的,和执行前面的任务不是同一个线程;6.
thenRunAsync()
需要传入一个既不需要参数,也没有返回值的任务; -
3. 创建CompletableFuture
- 异步任务:无返回值和有返回值的异步任务。
- 组合任务:使用
anyOf
和allOf
组合任务。
4. 取值与状态
- 常用方法:
join()
、get()
、get(1, TimeUnit.Hours)
等。
5. 控制CompletableFuture执行
- 完成:
future.complete("米饭")
- 异常:
future.completeExceptionally()
- 取消:
future.cancel(false)
6. 接续行为
- 接续方式1:通过
thenAcceptAsync
、thenRunAsync
等方法接续行为。 - 接续方式2:组合多个CompletableFuture,如
thenCombineAsync
、thenAcceptBothAsync
等。 - 接续方式3:结果处理,如
whenCompleteAsync
、handleAsync
、exceptionally
。
7. 代码示例
-
异步任务示例:
CompletableFuture future = CompletableFuture.supplyAsync(() -> { System.out.println("电饭煲开始做饭"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return "白米饭"; }).thenAccept(result -> { System.out.println("开始吃米饭"); }); System.out.println("我先去搞点牛奶和鸡蛋"); future.join();
-
组合任务示例:
List<CompletableFuture<YoutubeVideoEntity>> futures = subVideosList.stream() .map(item -> CompletableFuture.supplyAsync(() -> this.getRetry(item), ThreadPoolHolder.BG_CRAWLER_POOL) ).collect(Collectors.toList()); List<YoutubeVideoEntity> videoEntities = futures.stream().map(CompletableFuture::join) .filter(item -> item != null && item.getVideoId() != null).collect(Collectors.toList());
private List<String> getUserPermissions(String createId, boolean isFromGetUserRoleList) throws Exception { List<String> permissions = new ArrayList<>(); long startTime = System.currentTimeMillis(); CompletableFuture<List<String>> userRolesFuture = CompletableFuture.supplyAsync(() -> { try { long roleStartTime = System.currentTimeMillis(); List<String> roles = testRelayBiz.queryUserRoleList(createId); long roleEndTime = System.currentTimeMillis(); log.info("queryUserRoleList execution time: {} ms", roleEndTime - roleStartTime); return roles; } catch (Exception e) { throw new CompletionException("Error fetching user roles", e); } }, asyncExecutor).exceptionally(ex -> { log.error("Exception in querying user roles: {}", ex.getMessage()); return Collections.emptyList(); }); CompletableFuture<Boolean> isAdminFuture = CompletableFuture.supplyAsync(() -> { try { long adminStartTime = System.currentTimeMillis(); boolean isAdmin = testRelayBiz.getSystemIsAdmin(); long adminEndTime = System.currentTimeMillis(); log.info("getSystemIsAdmin execution time: {} ms", adminEndTime - adminStartTime); return isAdmin; } catch (Exception e) { throw new CompletionException("Error checking system admin status", e); } }, asyncExecutor).exceptionally(ex -> { log.error("Exception in checking system admin status: {}", ex.getMessage()); return false; }); // 等待所有异步任务完成 CompletableFuture.allOf(userRolesFuture, isAdminFuture).join(); List<String> userRoles = userRolesFuture.get(); Boolean systemIsAdmin = isAdminFuture.get(); if (CollectionUtils.isNotEmpty(userRoles) && UserRoleNameConstant.checkUserRole(userRoles)) { permissions.addAll(assemblePermissions(systemIsAdmin, isFromGetUserRoleList)); } long endTime = System.currentTimeMillis(); log.info("Total getUserPermissions execution time: {} ms", endTime - startTime); return permissions; }
CompletableFuture
没有现成的api实现快速失败的功能,所以我们只能结合allOf()
和anyOf()
来逻辑来自定义方法完成快速失败的逻辑;1.我们需要额外创建一个CompletableFuture来监听所有的CompletableFuture,一旦其中一个CompletableFuture产生异常,我们就设置额外的CompletableFuture立即完成。
2.把所有的CompletableFuture和额外的CompletableFuture放在
anyOf()
方法中,这样一旦额外的CompletableFuture完成,说明产生异常了;否则就需要等待所有的CompletableFuture完成。