1、线程池的实现大致原理
2、线程池的参数意义
1、做一个简单的 线程池demo 帮助理解
public interface ThreadPool<Job extends Runnable> { // 执行一个Job,这个Job需要实现Runnable void execute(Job job); // 关闭线程池 void shutdown(); // 增加工作者线程 void addWorkers(int num); // 减少工作者线程 void removeWorker(int num); // 得到正在等待执行的任务数量 int getJobSize(); }
1 package E09线程池; 2 3 import java.util.ArrayList; 4 import java.util.Collections; 5 import java.util.LinkedList; 6 import java.util.List; 7 import java.util.concurrent.atomic.AtomicLong; 8 9 public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { 10 // 线程池最大限制数 11 private static final int MAX_WORKER_NUMBERS = 10; 12 // 线程池默认的数量 13 private static final int DEFAULT_WORKER_NUMBERS = 5; 14 // 线程池最小的数量 15 private static final int MIN_WORKER_NUMBERS = 1; 16 // 这是一个工作列表,将会向里面插入工作 17 private final LinkedList<Job> jobs = new LinkedList<Job>(); 18 // 工作者列表 19 private final List<Worker> workers = Collections.synchronizedList(new 20 ArrayList<Worker>()); 21 // 工作者线程的数量 22 private int workerNum = DEFAULT_WORKER_NUMBERS; 23 // 线程编号生成 24 private AtomicLong threadNum = new AtomicLong(); 25 26 public DefaultThreadPool() { 27 initializeWokers(DEFAULT_WORKER_NUMBERS); 28 } 29 30 public DefaultThreadPool(int num) { 31 workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : (num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num); 32 initializeWokers(workerNum); 33 } 34 35 public void execute(Job job) { 36 if (job != null) { 37 // 添加一个工作,然后进行通知 38 synchronized (jobs) { 39 jobs.addLast(job); 40 jobs.notify(); 41 } 42 } 43 } 44 45 public void shutdown() { 46 for (Worker worker : workers) { 47 worker.shutdown(); 48 } 49 } 50 51 public void addWorkers(int num) { 52 synchronized (jobs) { 53 // 限制新增的Worker数量不能超过最大值 54 if (num + this.workerNum > MAX_WORKER_NUMBERS) { 55 num = MAX_WORKER_NUMBERS - this.workerNum; 56 } 57 initializeWokers(num); 58 this.workerNum += num; 59 } 60 } 61 62 public void removeWorker(int num) { 63 synchronized (jobs) { 64 if (num >= this.workerNum) { 65 throw new IllegalArgumentException("beyond workNum"); 66 } 67 // 按照给定的数量停止Worker 68 int count = 0; 69 while (count < num) { 70 Worker worker = workers.get(count); 71 if (workers.remove(worker)) { 72 worker.shutdown(); 73 count++; 74 } 75 } 76 this.workerNum -= count; 77 } 78 } 79 80 public int getJobSize() { 81 return jobs.size(); 82 } 83 84 // 初始化线程工作者 85 private void initializeWokers(int num) { 86 for (int i = 0; i < num; i++) { 87 Worker worker = new Worker(); 88 workers.add(worker); 89 Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum. 90 incrementAndGet()); 91 thread.start(); 92 } 93 } 94 95 // 工作者,负责消费任务 96 class Worker implements Runnable { 97 // 是否工作 98 private volatile boolean running = true; 99 100 public void run() { 101 while (running) { 102 Job job = null; 103 synchronized (jobs) { 104 // 如果工作者列表是空的,那么就wait 105 while (jobs.isEmpty()) { 106 try { 107 jobs.wait(); 108 } catch (InterruptedException ex) { 109 // 感知到外部对WorkerThread的中断操作,返回 110 Thread.currentThread().interrupt(); 111 return; 112 } 113 } 114 // 取出一个Job 115 job = jobs.removeFirst(); 116 } 117 if (job != null) { 118 try { 119 job.run(); 120 } catch (Exception ex) { 121 // 忽略Job执行中的Exception 122 } 123 } 124 } 125 } 126 127 public void shutdown() { 128 running = false; 129 } 130 } 131 }
解析:
1、线程池拥有2个链表,workers 工作者线程 和jobs 工作线程, 这就是最核心的参数了, 线程池初始化时,把worker启动,worker中循环调用 jobs中的job,执行它的run()。完毕后,继续获取,执行。
2、synchronized 同步块,锁定jobs对象,当 获取job时,若jobs链表中空,则 wait()。当添加job时,进行notify();
tips:这样做其实有点麻烦,我甚至根本不想看到sync这个关键字存在! 我们完全可以设计一个队列 jobs-plus,实现这些功能,我们使用时,只需 put() get() 其中的job即可,当然java也提供了多种 称之为阻塞队列的实现类。
3、可以增加workers,并且有一个volatile修饰的running 标志位,用于线程的shutdown,这样 一个简单的线程池的功能就完毕了。
实际的线程池,只是对这三个步骤的完善,扩充。
1、java提供了多个队列,用于实现 封装上述 private final LinkedList<Job> jobs = new LinkedList<Job>(); 工作者链表的 add remove两个功能,来保证线程安全,
这样我们就不用自己去写 同步块了。
2、阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不
满。
2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
实现类如下
(本文没有详细介绍,其实阻塞队列就是利用 Lock锁机制实现的,并加上一些数据结构,比如 优先级队列,实现Delay接口等)
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。 静态工厂方法Executors.newFixedThreadPool() 使用了这个队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。 静态工厂方法Executors.newCachedThreadPool使用了这个队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
我们抽取一个简单的ArrayBlockingQueue来解析下:
构造参数:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //使用了一个重入锁,可以选择是否公平竞争 notEmpty = lock.newCondition(); //创建了两个等待队列,用于进行阻塞: jobs 空,不能获取元素 , jobs满,不能添加元素。 notFull = lock.newCondition(); }
添加 任务时:
1 public void put(E e) throws InterruptedException { 2 checkNotNull(e); 3 final ReentrantLock lock = this.lock; 4 lock.lockInterruptibly(); 5 try { 6 while (count == items.length) 7 notFull.await(); //如果 jobs已满,则该线程被转入等待队列,等待其他线程通知(可以存放了) ,并解锁。 8 enqueue(e); //如果未满,则把E入队 ,并进行通知, notEmpty.signal(); 9 } finally { 10 lock.unlock(); 11 } 12 }
删除任务时:
1 public E take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); 4 try { 5 while (count == 0) 6 notEmpty.await(); 7 return dequeue(); 8 } finally { 9 lock.unlock(); 10 } 11 }
大概截这么多,阻塞队列的存在是为了简化使用者的使用,并且由于每种队列的实现不同,功能不同,将来在构造线程池时,可以随意选用。
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,
还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用
线程池,必须对其实现原理了如指掌。
预备知识就这么多了,其实掌握这些,就可以完全理解下一章 关于线程池的各项参数的意义。
原文:https://www.cnblogs.com/edisonhou/p/13164587.html