dubbo提供了四种线程池。其实我理解还是还是根据ThreadPoolExecutor这个JDK提供的线程池类,只不过适应性的改变了其中的参数。dubbo分别提供了1. 缓存线程池 2。固定大小线程池 3. 上届线程池 4.定时线程池。下面具体的说一说这些线程池。
1. 公共行为
首先这些线程池类均继承了ThreadPool接口。该接口中的定义了getExecutor
/**
* Thread pool
*
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url); // 实际上还是对JDKExcutor的封装
可以看到其返回值还是Executor。并且需要采用什么样的线程池,可以从URL中进行设置。
2. CachedThreadPool
public class CachedThreadPool implements ThreadPool {
// 可以看到这里设置了alive这个参数
// 那么也就是说用这个存活时间来控制整个线程池的时间
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
从源码中可以看到,将队列设置为同步队列,只要没有没有达到threads的数量,就会一直增加线程。
3.FixedThreadPool
public class FixedThreadPool implements ThreadPool {
// 开启固定大小的线程数
// 就是将核心池以及最大池大小都调整为一致。同时阻塞队列设置为同步队列
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
固定大小线程池,顾名思义就是线程池中的线程数量维持着固定大小。其原理就是将其中的队列设置为同步队列,同时将最大池和核心池的数量都设定为一致就行。
4. LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
// 如果queue == 0 则创建同步队列
// queue < 0 就创建无界队列
// queue > 0 就创建上届队列
// 可扩张线程池,就是把阻塞队列改成同步队列。这样有任务的时候就会一直开辟新的线程
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
顾名思义,可扩张线程池就是通过上届队列存储任务。
4.EagerThreadPoolExecutor
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
这个池子的比较不同的是:还是通过ThreadPoolExecutor实现任务的管理,唯一不同的是当任务执行失败的时候,会将任务存储到自定义的taskqueue中。同时维持这一个当前池子的任务的计数。
线程池中的所有核心线程都在忙碌时,此时如果再添加新的任务不会放入阻塞队列,而且创建新的线程,直到达到最大线程限制,此时如果还有任务,才会放入阻塞队列。
原文:https://www.cnblogs.com/jihuabai/p/13290891.html