java8新增对Future的补充,CompletableFuture支持流式计算、函数式编程等新特性,通过CompletableFuture,我们可以实现非阻塞的Future结果调用。
CompletableFuture实现了Future和CompletionStage两个接口,其中CompletionStage抽象了一些异步编程的补充方法。
(1) Future不支持手动完成Future任务,只能等待Future执行完成,而CompletableFuture提供complete方法,可以主动完成任务。
(2) Future不支持链式调用,CompletableFuture提供Completion stack的链式结构实现链式调用。
(3) Future的api中不支持异常处理,CompletableFuture提供异常处理exceptionally方法。
注:下面CompletableFuture统一用任务来称呼
supplyAsync方法:
创建一个有返回值的CompletableFuture,参数为Supperlier接口,返回值主要是重写Supplier接口的get方法。
  CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
                @Override
                public Long get() {
                    long result = new Random().nextInt(100);
                    System.out.println(format(String.valueOf(result)));
                    return result;
                }
            });
runAsync方法:
创建一个没有返回值的CompletableFuture,参数为Runnable。
            CompletableFuture<Void> future2 = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    System.out.println(new Random().nextInt(5));
                }
            });
thenApply方法:
接收上一个任务的返回值为入参,并返回一个新的任务(带返回值)。
        private static void thenApply() throws Exception {
            CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
                @Override
                public Long get() {
                    long result = new Random().nextInt(100);
                    //69
                    System.out.println(format(String.valueOf(result)));
                    return result;
                }
            }).thenApply(new Function<Long, Long>() {
                @Override
                public Long apply(Long t) {
                    long result = t * 5;
                    // 345
                    System.out.println(format(String.valueOf(result)));
                    return result;
                }
            });
            long result = future.get();
            // result = 345, 说明链式调用取最后的结果
            System.out.println("main = " + result);
        }
thenAccept方法:接收上一个任务的返回值为入参,并返回一个新的任务(不带返回值)。
thenRun方法:没有入参,并返回一个新的任务(不带返回值)。
thenCompose方法:两个有依赖关系的CompletableFuture,第一个执行后才执行第二个。
    void testThenCompose() throws ExecutionException, InterruptedException {
        // thenCompose合并两个有依赖关系的CompletableFutures的执行结果,第一个任务结果传给下一个future
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 2 / 1);
        //future1执行完的结果传给future2执行
        CompletableFuture<String> future2 = future1.thenCompose((result) ->
                CompletableFuture.supplyAsync(() -> String.valueOf(result * 2));
        );
        System.out.println(future2.get());
    }
thenCombine方法:两个任务组合,两个任务正常执行后执行thenCombine,两个任务的结果做参数。
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
        CompletableFuture<Integer> future3 = future1.thenCombine(future2, (result1, result2) -> {
            System.out.println("result1 : " + result1);
            System.out.println("result2 : " + result2);
            return result1 + result2;
        });
applyToEither方法:两个任务组合,只要有一个任务执行完成后就执行。
allOf方法:多个CompletableFuture全部执行完后,再执行回调。
anyOf方法:多个CompletableFuture有一个执行完就执行调用。
            CompletableFuture allOfFuture = CompletableFuture.allOf(future1, future2, future3).handle((result, e) -> {
                if (e != null) {
                    System.out.println("error : " + e.getMessage());
                }
                System.out.println("anyOf : " + result);
                return result;
            });
            CompletableFuture anyOfFuture = CompletableFuture.anyOf(future1, future2, future3).whenComplete((o, throwable) -> {
                System.out.println("anyOf : " + o.toString());
            });
whenComplete方法:
当某个任务执行完成后,传入执行结果(无论是否有异常)的回调方法,参数为(result,throwable)
exceptionally方法:
出现异常,会调用这个回调方法,参数为 (throwable)
handle方法:
异常捕捉方法,无论发生异常都会执行,参数为(result,throwable)
不带Async结尾的方法是同步方法,运行时使用调用这个方法的线程执行。
带Async结尾的方法是异步方法,执行时会提交给线程池(默认ForkJoinPool,可以自定义)来执行,提高任务的并行度。
示例:
 Long start = System.currentTimeMillis();
            CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
                Integer i = new Random().nextInt(100);
                System.out.println(Thread.currentThread().getName() + " : future1 = " + i);
                return i;
            }).thenApply(result -> {
                System.out.println(Thread.currentThread().getName() + " : future1 = " + result*2);
                return result*2;
            });
            System.out.println("future1 cast = "+(System.currentTimeMillis()-start));
            start = System.currentTimeMillis();
            CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
                Integer i = new Random().nextInt(100);
                System.out.println(Thread.currentThread().getName() + " : future2 = " + i);
                return i;
            }).thenApplyAsync(result -> {
                System.out.println(Thread.currentThread().getName() + " : future2 = " + result*2);
                return result*2;
            });
            System.out.println("future2 cast = "+(System.currentTimeMillis()-start));
            Thread.sleep(2000);
上述例子输出:可以看出future1的thenApply交由main线程执行,future2的thenApplyAsync由ForkJoinPool执行
ForkJoinPool.commonPool-worker-1 : future1 = 40
main : future1 = 80
future1 cast = 68
ForkJoinPool.commonPool-worker-1 : future2 = 83
future2 cast = 0
ForkJoinPool.commonPool-worker-1 : future2 = 166
如果在future1的supplyAsync加上Thread.sleep(1000)等等待操作,future1会由ForkJoinPool调用
分析CompletableFuture源码可以从它的内部内Completion开始分析:
Completion继承了ForkJoinTask,实现了Runnable接口,每个Completion可以看做CompletableFuture的一个阶段任务,Compltion有一个next指针,指向的是下一个Completion,多个CompletableFuture的回调实现就是通过Completion的next指针指向下一个Completion来实现的。
Completion源码:
    abstract static class Completion extends ForkJoinTask<Void> implements Runnable, CompletableFuture.AsynchronousCompletionTask {
        volatile CompletableFuture.Completion next; \\指向下一个Completion
        Completion() {
        }
        abstract CompletableFuture<?> tryFire(int var1); \\尝试执行这个Completion任务并唤醒后续的Completion
        abstract boolean isLive();
        public final void run() {
            this.tryFire(1);
        }
        public final boolean exec() {
            this.tryFire(1);
            return false;
        }
......
    }
UniCompletion和BiCompletion都是Completion的子类,他们比较Completion,新增加了一些属性:
1、Executor executor;\执行线程池
2、CompletableFuture
3、CompletableFuture
CompletableFuture的多种api实现都是依赖于UniCompletion和BiCompletion派生出来的子类实现的,例如简单的api的thenAccpect(),其实就封装了一个内部类UniAccept
UniAccept源码:
 static final class UniAccept<T> extends CompletableFuture.UniCompletion<T, Void> {
        Consumer<? super T> fn;\\传入的任务,函数式接口
        UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) {
            super(executor, dep, src);
            this.fn = fn;
        }
        final CompletableFuture<Void> tryFire(int mode) {
            CompletableFuture d;
            CompletableFuture a;
            Object r;
            Consumer f;
            if ((d = this.dep) != null && (f = this.fn) != null && (a = this.src) != null && (r = a.result) != null) {
                if (d.result == null) {
                    label33: {
                        if (r instanceof CompletableFuture.AltResult) {
                            Throwable x;
                            if ((x = ((CompletableFuture.AltResult)r).ex) != null) {
                                d.completeThrowable(x, r);
                                break label33;
                            }
                            r = null;
                        }
                        try {
                            if (mode <= 0 && !this.claim()) {
                                return null;
                            }
                            f.accept(r);//这里其实就执行了我们传入的Consumer接口定义的异步任务
                            d.completeNull();
                        } catch (Throwable var8) {
                            d.completeThrowable(var8);
                        }
                    }
                }
                this.dep = null;
                this.src = null;
                this.fn = null;
                return d.postFire(a, mode); \\ 通知这个Completion上的其他Completion,next指针(具体实现在postFire方法里)
            } else {
                return null;
            }
        }
    }
源码分析只是简单写了一些源码的实现方式,它的Completion的链路可以看成一个栈的数据结构。
java 线程相关(4) - CompletableFuture:对Future的补充,实现更方便的异步编程
原文:https://www.cnblogs.com/Zxq-zn/p/14838863.html