首页 > 其他 > 详细

ThreadPoolExecutor详解

时间:2021-03-11 22:35:11      阅读:32      评论:0      收藏:0      [点我收藏+]

本文主要针对ThreadPoolExecutor初始化到正常工作时的代码解读,对其他的生命周期先挖个坑,可能以后埋。

1.构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

所有构造方法都会调用这个构造方法。方法参数包含了ThreadPoolExecutor的7个最重要的参数。

corePoolSize:核心线程数上限。核心线程一旦创建,除非ThreadPoolExecutor准备关闭,不然一直存在。

workQueue:任务队列,如果任务数量太多,核心线程处理不过来,那么任务会先放入任务队列中等待处理。注意!!!队列最好自定义,并且按照项目实际情况设计容量。如果队列容量过大(如LinkedBlockingQueue,默认长度为Integer.MAX_VALUE),那么当任务过大的时候大概率导致OOM。

maximumPoolSize:最大线程数上限。当任务队列放不下,那么将会启动非核心线程执行当前任务或者从队列中取任务。一旦非核心线程没有任务,那么将一段时间后被回收。

keepAliveTime:生存时间,非核心线程在一段生存时间内都没有任务,将被回收。

unit:生存时间的单位,如秒,毫秒。

threadFactroy:生成线程的工厂对象。注意!!!工厂最好自定义,默认的jdk线程工厂不会对线程的三个属性:名字,是否后台线程和权限,做特殊的处理。为了方便服务器以后调优和bug处理,至少要给线程起符合业务场景的名字,帮助后期快速定位问题。

handler:在ThreadPoolExecutor不再接受新的任务,或者没有空闲非核心线程并且任务队列都已满的情况下,对任务执行的拒绝策略。jdk自带四种策略:1)抛异常。2)不抛异常,直接丢掉。3)丢掉队列中最旧的任务。4)谁发布的任务谁自己完成。注意!!!除了非常特殊的业务场景,不要使用任何jdk自带的策略。最好自定义拒绝策略,使用消息队列或者mysql将暂时无法处理的任务保存起来,空闲的时候完成。

 

2.ctl

ctl显示ThreadPoolExecutor的工作状态和目前的线程数,在后面代码中大量使用。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; }

ctl可以看作一个32位的二进制数,前三位为ThreadPoolExecutor的工作状态(包含RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,它们的初始赋值已经将数字都映射到ctl的前三位上了),后29位(COUNT_BITS)为ThreadPoolExecutor目前的线程数量。CAPACITY是一个低29位都是1的二进制数,可以通过与操作从ctl分离出工作状态和线程数量两个信息。

技术分享图片

上图workerCountOf(ctl)的示意图。上一个数字是CAPACITY,下一个数字是ctl(RUNNING状态,3个线程),通过与操作可以快速分离出线程数。runStateOf()也类似。

注意,第一位是符号位,只要是RUNNING状态,那么ctl肯定少于(SHUTDOWN及往后的所有状态)。

 

3.提交任务(execute)

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     //1.试图提交到核心线程
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
     //2.提交到队列
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
     //3.新建非核心线程
else if (!addWorker(command, false)) reject(command); }

1)试图提交到核心线程。

首先查询ctl当前的线程是否少于核心线程数。如果是,那么试图新建核心线程(addWork(command,true))并执行当前任务。新建核心线程前后再次检查ctl,如果已经核心线程数已经达到上限,那么新建失败。如果新建成功,那么提交任务也成功了。结束方法。如果新建过程失败,那么将进入步骤2。

2)插入队列。

当前线程会重新检查ctl,如果ThreadPoolExecutor还是工作状态(isRunning(c)),那么试图加入到任务队列中。加入队列成功,会再次重新检查ctl。如果ThreadPoolExecutor还在工作,那么检查线程数是否为0,如果是,那么新建非核心线程执行任务(addWork(null,false),jdk自带的Executors.CachePoolExecutor是没有核心线程,需要通过这个方式新建线程)。如果ThreadPoolExecutor不是工作状态了,那么从队列中去除该任务(remove(command)),并且执行拒绝策略。如果加入队列失败,就会进入步骤3。

3)新建非核心线程。

无法加入队列,那么试图新建非核心线程来完成当前任务,如果失败了,那么执行拒绝策略。

 

3.addWorker

reject()和offer()是拒绝策略和队列自带的方法,因此不做讲述。addWorker则是重点。

Worker是ThreadPoolExecutor的工作单元。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
     //持有的线程
        final Thread thread;
     //持有的任务
        Runnable firstTask;
     //完成的任务数
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
     //省略AQS体系的代码,详情以后在解析
        ....
    }

Worker具有线程和任务,并使用线程执行当前持有的任务。同时由于Worker继承AbstractQueuedSynchronizer类,因此它的方法实现都是基于AQS体系(AQS体系会在下一篇博客中提出),也可以认为它是一个锁。

接下来回到addWorker()方法中。方法分为两部分,分别是判断是否可以生成Worker和生成Worker。首先看第一部分:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
     //1.外循环,判断线程池当前的工作状态
for (;;) { int c = ctl.get(); int rs = runStateOf(c);        //2.看似复杂的条件判断 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;        //3.试图修改ctl中的当前线程数 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } }

步骤:

1)每次试图新建Worker(新建的过程会有多次,只要线程池一直在工作,那么就会一直重试新建,知道线程池工作被shutdown)都会重新检查ctl中的线程池工作状态。如果线程池的工作状态发生变化,跳到步骤2。否则跳到步骤3。

2)看上去很复杂的条件判断,这里可以给出两种更好理解的方式。

  2.1)经过调整后,意义不变的判断条件:        (rs > SHUTDOWN) ||  (rs == SHUTDOWN && firstTask != null) || (rs == SHUTDOWN && workQueue.isEmpty())

  2.2)如果线程池处于TIDY,TERMINATED,无法创建任何新Worker。如果是处于SHUTDOWN状态,那么当且仅当,任务队列还有任务以及addWorker()时不带有任何任务时(即仅仅是创建新Worker用来解决任务队列中没有完成的任务,不可以处理新任务),才可以新建Worker。

3)试图修改ctl的当前线程数。判断新建Worker后,线程数是否会超过ctl所能记录的最大线程数(2 ^ 29),然后根据core(addWorker方法参数,true代表这次创建的Worker持有核心线程。反之为非核心线程)判断当前线程数是否超过了核心线程上限或者非核心线程上限。如果超过了,那么直接退出方法。否则,使用CAS修改ctl中过的新建线程数,并结束第一部分的代码。如果CAS修改失败(其他线程速度更快),检查ctl的状态是否有变化。没有的话重复步骤3,否则跳到步骤1。

 

第二部分:

     boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
       //1.新建Worker对象 w
= new Worker(firstTask); final Thread t = w.thread; if (t != null) {
          //2.将Worker对象加入到自己的workers中
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); }
          //3.工人开始工作
if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

步骤:

1)新建Worker。新Worker会从线程工厂中获取线程。

2)将Worker对象加入到自己的workers中。workers是一个HashSet,用来存放所有的Worker。为了保证同步,使用了ThreadPoolExecutor的lock进行锁定。再加入workers前,需要确保Worker还没有开始工作。

3)Worker开始工作。上面知道,Worker即可以看作线程,也可以看作锁。因此它的run()相当地有趣。我将在下一篇文章中详解AQS体系,看看Worker和ReentrantLock之间的异同。

 

至此,提交任务的部分都讲完了。如果有时间,我会补充ThreadPoolExecutor的其他方法的,有缘再见!

ThreadPoolExecutor详解

原文:https://www.cnblogs.com/sergeantFat/p/14518052.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!