一、什么是ForkJion
三、特点:
工作窃取-----ForkJion里面是一个双端队列,当A线程运行完自己的任务后,如果B线程没完成,就去B线程的另一端去一个任务来完成
四、核心类
ForkJoinPool是ForkJoin框架中的任务调度器,和ThreadPoolExecutor一样实现了自己的线程池,提供了三种调度子任务的方法:
execute:异步执行指定任务,无返回结果;
invoke、invokeAll:异步执行指定任务,等待完成才返回结果;
submit:异步执行指定任务,并立即返回一个Future对象;
Fork/Join框架中的实际的执行任务类,有以下两种实现,一般继承这两种实现类即可。
RecursiveAction:用于无结果返回的子任务;
RecursiveTask:用于有结果返回的子任务;
package com.jenne.juc.forkjion; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.stream.LongStream; /** * 1、forkjoinPool 通过它来执行 * 2、计算任务 forkjoinPool.execute(ForkJoinTask task) * 3、计算类要继承 ForkJoinTask及其子类 */ //运行类 public class Demo { public static void main(String[] args) throws ExecutionException, InterruptedException { test2(); } // 一般操作 public static void test1() { Long sum = 0L; long start = System.currentTimeMillis(); for (Long i = 1L; i <= 10_0000_0000; i++) { sum += i; } long end = System.currentTimeMillis(); System.out.println("sum=" + sum + " 时间:" + (end - start)); } // 使用ForkJoin public static void test2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinTaskDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务 Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum=" + sum + " 时间:" + (end - start)); } //Stream并行流 public static void test3() { long start = System.currentTimeMillis(); // Stream并行流() (] long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("sum=" + "时间:" + (end - start)); } } //计算类 class ForkJoinTaskDemo extends RecursiveTask<Long> { private Long start; // 1 private Long end; // 1990900000 // 临界值 private Long temp = 10000L; public ForkJoinTaskDemo(Long start, Long end) { this.start = start; this.end = end; } // 计算方法 @Override protected Long compute() { if ((end - start) < temp) { Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; } else { // forkjoin 递归 long middle = (start + end) / 2; // 中间值 ForkJoinTaskDemo task1 = new ForkJoinTaskDemo(start, middle); task1.fork(); // 拆分任务,把任务压入线程队列 ForkJoinTaskDemo task2 = new ForkJoinTaskDemo(middle + 1, end); task2.fork(); // 拆分任务,把任务压入线程队列 return task1.join() + task2.join(); } } }
原文:https://www.cnblogs.com/jenne-blog/p/13054633.html