经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。
由此可以扩展,在很多任务下,我们需要执行两个任务但这两个任务并没有前后的关联关系,我们也希望两个任务能够同时执行,然后再将执行结果汇聚就可以了。
future 通过提交一个 callable 任务给线程池,线程池后台启动其他线程去执行,然后再调用 get() 方法获取结果
private void test() {
  ExecutorService executor = Executors.newCachedThreadPool();
  Future<Integer> future = executor.submit(() -> sleep(1));
  try {
    Integer integer = future.get(3, TimeUnit.SECONDS);
    System.out.println(integer);
  } catch (InterruptedException e) {
    // 当前线在等待中被中断
    e.printStackTrace();
  } catch (ExecutionException e) {
    // 任务执行中的异常
    e.printStackTrace();
  } catch (TimeoutException e) {
    // 超时
    e.printStackTrace();
  }
}
private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 1;
}该方式存在的问题,如果 sleep 执行超过 3 秒钟,future 将无法拿到返回结果。当然,Future 提供了一个无参的get 方法,可以一直等待结果。不过还是建议使用带超时参数的 get 方法,同时定义好超时的处理方法。
调用某个方法,调用方在被调用方运行的过程中会等待,直到被调用方运行结束后返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方不在同一个线程中运行,调用方还是需要等待被调用方结束才运行,这就是阻塞式调用。
异步 API 调用后会直接返回,将计算任务交给其他线程来进行。其他线程执行完成后,再将结果返回给调用方。
使用异步 API
public void test(){
  CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
  new Thread(() -> {
    int sleep = sleep(1);
    completableFuture.complete(sleep);
  }).start();
  CompletableFuture<Integer> completableFuture1 = new CompletableFuture<>();
  new Thread(() -> {
    int sleep = sleep(2);
    completableFuture1.complete(sleep);
  }).start();
  Integer integer = null;
  Integer integer1 = null;
  try {
    integer = completableFuture.get();
    integer1 = completableFuture1.get();
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  }
  System.out.println(integer + "....CompletableFuture.." + integer1);
  Instant end = Instant.now();
  Duration duration = Duration.between(start, end);
  long l = duration.toMillis();
  System.err.println(l);
}   
private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return timeout;
}异步处理
上面代码的问题是,如果在线程内发生了异常,如何在外部的调用中被发现,同时去处理呢?正常的情况是,线程内发生异常,会直接被封锁在线程内,而最终线程会被杀死,那么 get 方法一直会阻塞。
此时就不应该使用 get() 方法,而是使用带有超时参数的 get 方法,并且在线程内,将异常传递回调用方。
new Thread(() -> {
  try {
    int sleep = sleep(2);
    completableFuture1.complete(sleep);
  } catch (Exception e) {
    completableFuture1.completeExceptionally(e);
  }
}).start();completableFuture1.completeExceptionally(e); 将异常传递出来,在 ExecutionException 中会被捕获,然后对其进行处理即可。
try {
  integer = completableFuture.get();
  integer1 = completableFuture1.get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
}示例:
public void test(){
  CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
  new Thread(() -> {
    try {
      throw new RuntimeException("故意抛出的异常...");
    } catch (Exception e) {
      completableFuture.completeExceptionally(e);
    }
  }).start();
  Integer integer = null;
  try {
    integer = completableFuture.get(3, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  } catch (TimeoutException e) {
    e.printStackTrace();
  }
  System.out.println(integer + "....CompletableFuture.." );
  Instant end = Instant.now();
  Duration duration = Duration.between(start, end);
  long l = duration.toMillis();
  System.err.println(l);
}此时会收到的异常:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 故意抛出的异常...
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at com.example.demo.me.sjl.service.UserService.test(UserService.java:92)
    at com.example.demo.me.sjl.controller.UserController.test(UserController.java:20)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at ....supplyAsync 创建 CompletableFutureCompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> sleep(1));相比于 new 的方式,更加优雅、简洁,并且不用显式的创建线程(new Thread) 操作。默认会交由 ForkJoinPoll 池中的某个执行线程运行,同时也提供了重载的方法,指定 Executor 。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
  return asyncSupplyStage(asyncPool, supplier);
}如何确定默认的线程数量:
如果配置了系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 则取该值,转换成 int 作为线程数量
String pp = System.getProperty
              ("java.util.concurrent.ForkJoinPool.common.parallelism");
if (pp != null)
              parallelism = Integer.parseInt(pp);没有配置该值,则取 Runtime.getRuntime().availableProcessors() 值作为线程数量
if (parallelism < 0 && // default 1 less than #cores
          (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
          parallelism = 1;parallelism 初始值为 -1
调整线程池的大小
$$
N{threads} = N{cpu}  U_{cpu}  (1+W/C)
$$
其中:$$N_{cppu}$$ 是处理器的核的数目,可以通过 Runtime.getRuntime().availableProcessors 得到
上面是一个参考公式《Java并发编程实战》(http://mng.bz/979c )
这是计算出的理论值,不过我们在使用时,需要考虑实际情况,比如我有 5 个并行任务,那么我需要开启 5 个线程来分别进行执行,多了会千万浪费,少了达不到并发的效果。此时我们需要 5 个线程。
public void save(){
    CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
      UserEntity entity = UserEntity.builder()
        .id(1112)
        .userName("施杰灵")
        .password("abc1213")
        .birthday("2018-08-08")
        .createUser("1")
        .createTime(LocalDateTime.now())
        .updateUser("2")
        .updateTime(LocalDateTime.now())
        .build();
      return userRepository.save(entity);
    });
    CompletableFuture<UserEntity> completableFuture1 = CompletableFuture.supplyAsync(() -> {
      UserEntity entity = UserEntity.builder()
        .id(223)
        .userName("施杰灵1")
        .password("abc12131")
        .birthday("2018-08-18")
        .createUser("11")
        .createTime(LocalDateTime.now())
        .updateUser("21")
        .updateTime(LocalDateTime.now())
        .build();
      if (true) {
        throw new RuntimeException("故意抛出的异常...");
      }
      return userRepository.save(entity);
    });
    System.out.println(completableFuture.join());
    System.out.println(completableFuture1.join());
}测试结果,上面那条数据正常插入到数据库中,下面的数据插入失败。事务并没有回滚。
将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
public void test(){
  CompletableFuture<Integer> compose = CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCompose(
        (x) -> CompletableFuture.supplyAsync(() -> sleep(x))
    );
}
private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return timeout;
}从上面的代码中,可以看到在进行计算的时候,是使用到了前面的返回值 x ,整个任务的运行时间是 4 秒。
public void test() {
  CompletableFuture<Integer> combine = CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCombine(
        CompletableFuture.supplyAsync(() -> sleep(1)),
        (t1, t2) -> t1 + t2
  );
}
private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return timeout;
}两个方法接收的参数是一致的,区别在于他们接收的第二个参数:BiFunction 是否会在提交到线程池中,由另外一个任务以异步的方式执行。thenCombine 不会以异步方式执行 BiFunction 而 thenCombineAsync 会以异步的方式执行。
何时使用 Async 后缀的方法?
当我们进行合并的方法是一个耗时的方法时,就尽可能的考虑使用 Async 后缀的方法。
我们平常的操作是,插入数据库时,如果两个操作中,其中一个操作发生异常,是否会回滚?
@Transactional(rollbackFor = Exception.class)
public void save() {
  CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
    UserEntity entity = UserEntity.builder()
      .id(111)
      .userName("施杰灵")
      .password("abc1213")
      .birthday("2018-08-08")
      .createUser("1")
      .createTime(LocalDateTime.now())
      .updateUser("2")
      .updateTime(LocalDateTime.now())
      .build();
    return userRepository.save(entity);
  }).thenCombine(CompletableFuture.supplyAsync(() -> {
    UserEntity entity = UserEntity.builder()
      .id(222)
      .userName("施杰灵1")
      .password("abc12131")
      .birthday("2018-08-18")
      .createUser("11")
      .createTime(LocalDateTime.now())
      .updateUser("21")
      .updateTime(LocalDateTime.now())
      .build();
    return userRepository.save(entity);
  }), (a, b) -> {
    System.out.println(a);
    System.out.println(b);
    return a;
  });
  UserEntity join = completableFuture.join();
  System.out.println(join);
}
经过实际测试,第二个任务抛出异常,是会回滚的。
Java 8的CompletableFuture 通过thenAccept 方法提供了这一功能,它接收CompletableFuture 执行完毕后的返回值做参数。
public void test() {
  CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCombineAsync(
    CompletableFuture.supplyAsync(() -> sleep(1)),
    (t1, t2) -> t1 + t2
  ).thenAccept((t) -> System.out.println(t + "------"));
}
private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return timeout;
}public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) ;BlockingQueue\<Runnable\> workQueue : 一个阻塞队列,用来存储等待执行的任务,这个参数的选择会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择
ArrayBlockingQueue
PriorityBlockingQueue
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
原文:https://blog.51cto.com/mengxiangsjl/2434980