ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,//核心线程数量
10,//最大线程数量
60,//保活时间
TimeUnit.SECONDS,//时间单位
new LinkedBlockingQueue<Runnable>(),//等待队列
new ThreadPoolExecutor.CallerRunsPolicy());//淘汰策略
基于以上线程池创建,Executors提供基础的线程池模板:
Executors.newSingleThreadExecutor();//单线程 顺序执行,保证任务的执行顺序
Executors.newCachedThreadPool();//创建一个可缓存线程池,线程池为无限大,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
Executors.newFixedThreadPool(10);//定长=10的,可控制线程最大并发数,超出的线程会在队列中等待
Executors.newScheduledThreadPool(10);//定长=10的,可周期执行任务的线程池
函数的差别:
execuse和submit:是否有返回值
invoke:阻塞当前任务,等返回值
判断是否达到最大核心线程数量->判断阻塞队列是否占满->判断是否达到最大线程数->判断淘汰策略
等待队列接口:
BlockingQueue
TransferQueue:队列长度为0,没有消费者的时候,生产者被阻塞
BlockingQueue:是定长的,当队列满的时候,会触发入队操作等待。
常用的实现类:
LinkedTransferQueue//比SynchronousQueue高效
LinkedBlockingQueue
DelayedWorkQueue//保证队列按照时间顺序来实现
MyFoekJoinTask extends RecursiveTask<V> {
/**
* The main computation performed by this task.
* @return the result of the computation
* 进行自主分隔
*/
protected V compute() {
if(不再拆分) {
return 计算结果;
}else{
fork1 = new MyFoekJoinTask();
fork2 = new MyFoekJoinTask();
//1. 直接这样使用会导致有一个线程变成boss线程。则拆分的任务不会并发执行,执行时间会变长
fork1.fork();
fork2.fork();
//2.
invokeAll(fork1,fork2)
return fork1.join()+fork2.join();
}
}
protected final boolean exec() {
result = compute();
return true;
}
}
在以上并行计算中,最重要的就是fork和join。可以理解为递归
fork:方法用于将新创建的子任务放入当前线程的work queue队列中
join:触发阻塞,等待子任务返回结果。
public ForkJoinPool(int parallelism,//并行级别
ForkJoinWorkerThreadFactory factory,//线程工厂
UncaughtExceptionHandler handler,//异常处理
boolean asyncMode) {//FIFO 还是 LIFO(false)
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
真正的任务拆分,在task中进行
参考目录:
https://blog.csdn.net/f641385712/article/details/83749798
https://blog.csdn.net/tyrroo/article/details/81390202
原文:https://www.cnblogs.com/ElliottX4/p/14345646.html