Jinx

一只奶牛猫

CompletableFuture使用大全

发布于 # java

CompletableFuture使用大全

1. CompletableFuture基本概述

  • 用途:用于异步调用,内部封装线程池,简化多线程和高并发处理。
  • 特点:封装了Future,具有回调功能,可在任务完成后继续进行下一个动作。

Runnable就是没有返回结果的行为。

Callable是有返回结果的行为。

Future 异步封装Callable和Runnable,委托给线程池执行后,需要取回执行的结果

CompletableFuture 封装了Future,使其拥有了回调的功能,在某个行为完成之后,可以继续进行下一个动作。

2. API方法分类与记忆规律

  • 创建类
    • completeFuture:创建默认返回值的CompletableFuture。
    • runAsync:异步执行无返回值。
    • supplyAsync:异步执行有返回值。
    • anyOf:任意一个执行完成即可进行下一步。
    • allOf:所有任务完成才进行下一步。
  • 状态取值类
    • join:合并结果,等待。
    • get:合并等待结果,可增加超时时间。
    • getNow:如果结果计算完成或异常则返回,否则返回指定值。
    • isCancelledisCompletedExceptionallyisDone:状态检查方法。
  • 控制类:用于主动控制CompletableFuture的完成行为。
    • completecompleteExceptionallycancel:控制完成、异常和取消。
  • 接续类:注入回调行为,是CompletableFuture的核心特性。
    • thenApplythenApplyAsync

    • thenAcceptthenAcceptAsync

    • thenRunthenRunAsync

    • thenCombinethenCombineAsync

    • thenAcceptBoththenAcceptBothAsync

    • runAfterBothrunAfterBothAsync

    • applyToEitherapplyToEitherAsync

    • acceptEitheracceptEitherAsync

    • runAfterEitherrunAfterEitherAsync

    • thenComposethenComposeAsync

    • whenCompletewhenCompleteAsync

    • handlehandleAsync

    • exceptionally

    1.入口函数supplyAsync()代表一个异步的有返回值的函数,之所以异步,是与主线程区别,从线程池中的拿一个线程来执行。

    2.thenApply()thenAccept()没有Async,意味着是和前面的任务共用一个线程,从执行结果上我们也可以看到线程名称相同。

    3.thenApply()需要接收上一个任务的返回值,并且自己也要有返回值。

    4.thenAccept()需要接收上一个任务的返回值,但是它不需要返回值。

    5.thenApplyAsync()thenRunAsync()分别表示里面的任务都是异步执行的,和执行前面的任务不是同一个线程;

    6.thenRunAsync()需要传入一个既不需要参数,也没有返回值的任务;

3. 创建CompletableFuture

  • 异步任务:无返回值和有返回值的异步任务。
  • 组合任务:使用anyOfallOf组合任务。

4. 取值与状态

  • 常用方法join()get()get(1, TimeUnit.Hours)等。

5. 控制CompletableFuture执行

  • 完成future.complete("米饭")
  • 异常future.completeExceptionally()
  • 取消future.cancel(false)

6. 接续行为

  • 接续方式1:通过thenAcceptAsyncthenRunAsync等方法接续行为。
  • 接续方式2:组合多个CompletableFuture,如thenCombineAsyncthenAcceptBothAsync等。
  • 接续方式3:结果处理,如whenCompleteAsynchandleAsyncexceptionally

7. 代码示例

  • 异步任务示例

    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        System.out.println("电饭煲开始做饭");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "白米饭";
    }).thenAccept(result -> {
        System.out.println("开始吃米饭");
    });
    System.out.println("我先去搞点牛奶和鸡蛋");
    future.join();
    
  • 组合任务示例

    List<CompletableFuture<YoutubeVideoEntity>> futures = subVideosList.stream()
        .map(item ->
            CompletableFuture.supplyAsync(() -> this.getRetry(item), ThreadPoolHolder.BG_CRAWLER_POOL)
        ).collect(Collectors.toList());
    List<YoutubeVideoEntity> videoEntities = futures.stream().map(CompletableFuture::join)
        .filter(item -> item != null && item.getVideoId() != null).collect(Collectors.toList());
    
    private List<String> getUserPermissions(String createId, boolean isFromGetUserRoleList) throws Exception {
            List<String> permissions = new ArrayList<>();
        
            long startTime = System.currentTimeMillis();
    
            CompletableFuture<List<String>> userRolesFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    long roleStartTime = System.currentTimeMillis();
                    List<String> roles = testRelayBiz.queryUserRoleList(createId);
                    long roleEndTime = System.currentTimeMillis();
                    log.info("queryUserRoleList execution time: {} ms", roleEndTime - roleStartTime);
                    return roles;
                } catch (Exception e) {
                    throw new CompletionException("Error fetching user roles", e);
                }
            }, asyncExecutor).exceptionally(ex -> {
                log.error("Exception in querying user roles: {}", ex.getMessage());
                return Collections.emptyList();
            });
        
            CompletableFuture<Boolean> isAdminFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    long adminStartTime = System.currentTimeMillis();
                    boolean isAdmin = testRelayBiz.getSystemIsAdmin();
                    long adminEndTime = System.currentTimeMillis();
                    log.info("getSystemIsAdmin execution time: {} ms", adminEndTime - adminStartTime);
                    return isAdmin;
                } catch (Exception e) {
                    throw new CompletionException("Error checking system admin status", e);
                }
            }, asyncExecutor).exceptionally(ex -> {
                log.error("Exception in checking system admin status: {}", ex.getMessage());
                return false;
            });
        
            // 等待所有异步任务完成
            CompletableFuture.allOf(userRolesFuture, isAdminFuture).join();
        
            List<String> userRoles = userRolesFuture.get();
            Boolean systemIsAdmin = isAdminFuture.get();
        
            if (CollectionUtils.isNotEmpty(userRoles) && UserRoleNameConstant.checkUserRole(userRoles)) {
                permissions.addAll(assemblePermissions(systemIsAdmin, isFromGetUserRoleList));
            }
        
            long endTime = System.currentTimeMillis();
            log.info("Total getUserPermissions execution time: {} ms", endTime - startTime);
    
            return permissions;
        }
    

CompletableFuture没有现成的api实现快速失败的功能,所以我们只能结合allOf()anyOf()来逻辑来自定义方法完成快速失败的逻辑;

1.我们需要额外创建一个CompletableFuture来监听所有的CompletableFuture,一旦其中一个CompletableFuture产生异常,我们就设置额外的CompletableFuture立即完成。

2.把所有的CompletableFuture和额外的CompletableFuture放在anyOf()方法中,这样一旦额外的CompletableFuture完成,说明产生异常了;否则就需要等待所有的CompletableFuture完成。