首页 > 编程语言 > 详细

java多线程系列:二 线程池全面解析_1

时间:2020-06-19 20:31:51      阅读:74      评论:0      收藏:0      [点我收藏+]
  • 前言- 本系列随笔 会深入浅出,解析java多线程的各种技术及实现。
  • 随笔主要根据 《java并发编程的艺术》一书作为参考。 本系列以使用为主要目的,本人理解有限,还望读者辩证采纳,没有过多涉及源码的讨论,重在初学者的使用,理解伪码。
  • 预备知识: 无
  • 可能新手对下面部分内容完全不理解,我后续会继续更博,并最终将该系列排序。

零、本章节讲解内容

1、线程池的实现大致原理

2、线程池的参数意义

一 、学习线程池之前的预热 之 模拟线程池

1、做一个简单的 线程池demo 帮助理解

public interface ThreadPool<Job extends Runnable> {
    // 执行一个Job,这个Job需要实现Runnable
    void execute(Job job);
    // 关闭线程池
    void shutdown();
    // 增加工作者线程
    void addWorkers(int num);
    // 减少工作者线程
    void removeWorker(int num);
    // 得到正在等待执行的任务数量
    int getJobSize();
}
  1 package E09线程池;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Collections;
  5 import java.util.LinkedList;
  6 import java.util.List;
  7 import java.util.concurrent.atomic.AtomicLong;
  8 
  9 public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
 10     // 线程池最大限制数
 11     private static final int MAX_WORKER_NUMBERS = 10;
 12     // 线程池默认的数量
 13     private static final int DEFAULT_WORKER_NUMBERS = 5;
 14     // 线程池最小的数量
 15     private static final int MIN_WORKER_NUMBERS = 1;
 16     // 这是一个工作列表,将会向里面插入工作
 17     private final LinkedList<Job> jobs = new LinkedList<Job>();
 18     // 工作者列表
 19     private final List<Worker> workers = Collections.synchronizedList(new
 20             ArrayList<Worker>());
 21     // 工作者线程的数量
 22     private int workerNum = DEFAULT_WORKER_NUMBERS;
 23     // 线程编号生成
 24     private AtomicLong threadNum = new AtomicLong();
 25 
 26     public DefaultThreadPool() {
 27         initializeWokers(DEFAULT_WORKER_NUMBERS);
 28     }
 29 
 30     public DefaultThreadPool(int num) {
 31         workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : (num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num);
 32         initializeWokers(workerNum);
 33     }
 34 
 35     public void execute(Job job) {
 36         if (job != null) {
 37 // 添加一个工作,然后进行通知
 38             synchronized (jobs) {
 39                 jobs.addLast(job);
 40                 jobs.notify();
 41             }
 42         }
 43     }
 44 
 45     public void shutdown() {
 46         for (Worker worker : workers) {
 47             worker.shutdown();
 48         }
 49     }
 50 
 51     public void addWorkers(int num) {
 52         synchronized (jobs) {
 53 // 限制新增的Worker数量不能超过最大值
 54             if (num + this.workerNum > MAX_WORKER_NUMBERS) {
 55                 num = MAX_WORKER_NUMBERS - this.workerNum;
 56             }
 57             initializeWokers(num);
 58             this.workerNum += num;
 59         }
 60     }
 61 
 62     public void removeWorker(int num) {
 63         synchronized (jobs) {
 64             if (num >= this.workerNum) {
 65                 throw new IllegalArgumentException("beyond workNum");
 66             }
 67 // 按照给定的数量停止Worker
 68             int count = 0;
 69             while (count < num) {
 70                 Worker worker = workers.get(count);
 71                 if (workers.remove(worker)) {
 72                     worker.shutdown();
 73                     count++;
 74                 }
 75             }
 76             this.workerNum -= count;
 77         }
 78     }
 79 
 80     public int getJobSize() {
 81         return jobs.size();
 82     }
 83 
 84     // 初始化线程工作者
 85     private void initializeWokers(int num) {
 86         for (int i = 0; i < num; i++) {
 87             Worker worker = new Worker();
 88             workers.add(worker);
 89             Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.
 90                     incrementAndGet());
 91             thread.start();
 92         }
 93     }
 94 
 95     // 工作者,负责消费任务
 96     class Worker implements Runnable {
 97         // 是否工作
 98         private volatile boolean running = true;
 99 
100         public void run() {
101             while (running) {
102                 Job job = null;
103                 synchronized (jobs) {
104 // 如果工作者列表是空的,那么就wait
105                     while (jobs.isEmpty()) {
106                         try {
107                             jobs.wait();
108                         } catch (InterruptedException ex) {
109 // 感知到外部对WorkerThread的中断操作,返回
110                             Thread.currentThread().interrupt();
111                             return;
112                         }
113                     }
114 // 取出一个Job
115                     job = jobs.removeFirst();
116                 }
117                 if (job != null) {
118                     try {
119                         job.run();
120                     } catch (Exception ex) {
121 // 忽略Job执行中的Exception
122                     }
123                 }
124             }
125         }
126 
127         public void shutdown() {
128             running = false;
129         }
130     }
131 }

 解析:

1、线程池拥有2个链表,workers 工作者线程 和jobs 工作线程, 这就是最核心的参数了, 线程池初始化时,把worker启动,worker中循环调用 jobs中的job,执行它的run()。完毕后,继续获取,执行。

2、synchronized  同步块,锁定jobs对象,当 获取job时,若jobs链表中空,则 wait()。当添加job时,进行notify();  

  tips:这样做其实有点麻烦,我甚至根本不想看到sync这个关键字存在!  我们完全可以设计一个队列 jobs-plus,实现这些功能,我们使用时,只需 put() get()  其中的job即可,当然java也提供了多种 称之为阻塞队列的实现类。

3、可以增加workers,并且有一个volatile修饰的running 标志位,用于线程的shutdown,这样 一个简单的线程池的功能就完毕了。

 实际的线程池,只是对这三个步骤的完善,扩充。

 

 二、预热 之 阻塞队列

1、java提供了多个队列,用于实现 封装上述   private final LinkedList<Job> jobs = new LinkedList<Job>();   工作者链表的 add remove两个功能,来保证线程安全,

  这样我们就不用自己去写 同步块了。

2、阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不
满。
2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

技术分享图片

 

实现类如下

(本文没有详细介绍,其实阻塞队列就是利用 Lock锁机制实现的,并加上一些数据结构,比如 优先级队列,实现Delay接口等)

·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。  静态工厂方法Executors.newFixedThreadPool() 使用了这个队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。        静态工厂方法Executors.newCachedThreadPool使用了这个队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

我们抽取一个简单的ArrayBlockingQueue来解析下:

构造参数:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //使用了一个重入锁,可以选择是否公平竞争 notEmpty = lock.newCondition(); //创建了两个等待队列,用于进行阻塞: jobs 空,不能获取元素 , jobs满,不能添加元素。 notFull = lock.newCondition(); }

  

添加 任务时:

 1 public void put(E e) throws InterruptedException {
 2         checkNotNull(e);
 3         final ReentrantLock lock = this.lock;
 4         lock.lockInterruptibly();
 5         try {
 6             while (count == items.length)
 7                 notFull.await();     //如果 jobs已满,则该线程被转入等待队列,等待其他线程通知(可以存放了) ,并解锁。
 8             enqueue(e);   //如果未满,则把E入队 ,并进行通知, notEmpty.signal(); 
 9         } finally {
10             lock.unlock();
11         }
12     }

 

 

删除任务时:

 1  public E take() throws InterruptedException {
 2         final ReentrantLock lock = this.lock;
 3         lock.lockInterruptibly();
 4         try {
 5             while (count == 0)
 6                 notEmpty.await();
 7             return dequeue();
 8         } finally {
 9             lock.unlock();
10         }
11     }

 

 

大概截这么多,阻塞队列的存在是为了简化使用者的使用,并且由于每种队列的实现不同,功能不同,将来在构造线程池时,可以随意选用。

 

 

三、预热 之 线程池存在的意义

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,
还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用
线程池,必须对其实现原理了如指掌。

 

 

预备知识就这么多了,其实掌握这些,就可以完全理解下一章 关于线程池的各项参数的意义。

 

java多线程系列:二 线程池全面解析_1

原文:https://www.cnblogs.com/edisonhou/p/13164587.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!