生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。
使用生产者消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力。
整体如上图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。
而中间的 3 和 4 分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。
那么什么时候阻塞线程需要被唤醒呢?有两种情况。
第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。
另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产。
import java.util.concurrent.ArrayBlockingQueue;
/**
* 使用阻塞队列实现一个生产者与消费者模型
*
* @author xiandongxie
*/
public class ProducerAndConsumer {
private static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) throws InterruptedException {
Producer producer = new Producer();
Consumer consumer = new Consumer();
Thread producer1 = new Thread(producer, "producer-1");
Thread producer2 = new Thread(producer, "producer-2");
Thread consumer1 = new Thread(consumer, "consumer-2");
Thread consumer2 = new Thread(consumer, "consumer-2");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
Thread.sleep(5);
producer1.interrupt();
Thread.sleep(5);
producer2.interrupt();
Thread.sleep(5);
consumer1.interrupt();
consumer2.interrupt();
}
static class Producer implements Runnable {
@Override
public void run() {
int count = 0;
while (true && !Thread.currentThread().isInterrupted()) {
count++;
String message = Thread.currentThread().getName() + " message=" + count;
try {
queue.put(message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
while (true && !Thread.currentThread().isInterrupted()) {
try {
String take = queue.take();
System.out.println(Thread.currentThread().getName() + ",消费信息:" + take);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
}
}
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 采用 Condition 自定义阻塞队列实现消费者与生产者
*
* @author xiandongxie
*/
public class MyBlockingQueueForCondition<E> {
private Queue<E> queue;
private int max = 16;
private ReentrantLock lock = new ReentrantLock();
// 没有空,则消费者可以消费,标记 消费者
private Condition notEmpty = lock.newCondition();
// 没有满,则生产者可以生产,标记 生产者
private Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int size) {
this.max = size;
queue = new LinkedList();
}
public void put(E o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
// 如果满了,阻塞生产者线程,释放 Lock
notFull.await();
}
queue.add(o);
// 有数据了,通知等待的消费者,并唤醒
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
// 如果为空,阻塞消费者线程
notEmpty.await();
}
E item = queue.remove();
// queue 未满,唤醒生产者
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
}
import java.util.LinkedList;
/**
* 采用 wait,notify 实现阻塞队列
*
* @author xiandongxie
*/
public class MyBlockingQueue<E> {
private int maxSize;
private LinkedList<E> storage;
public MyBlockingQueue(int maxSize) {
this.maxSize = maxSize;
storage = new LinkedList<>();
}
public synchronized void put(E e) throws InterruptedException {
try {
while (storage.size() == maxSize) {
// 满了
wait();
}
storage.add(e);
} finally {
notifyAll();
}
}
public synchronized E take() throws InterruptedException {
try {
while (storage.size() == 0) {
// 没有数据
wait();
}
return storage.remove();
} finally {
notifyAll();
}
}
}
原文:https://www.cnblogs.com/xiexiandong/p/13286399.html