生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。
这里我们实现如下的情况的生产-消费模型:
生产者不断交替地生产两组数据“姓名--1-->内容--1”,“姓名--2-->内容--2”,这里的“姓名--1”和“姓名--2”模拟为数据的名称,“内容--1 ”和“内容--2 ”模拟为数据的内容。
由于本程序中牵扯到线程运行的不确定性,因此可能会出现以下问题:
1.假设生产者线程刚向数据存储空间添加了数据的名称,还没有加入该信息的内容,程序就切换到了消费者线程,消费者线程把信息的名称和上一个信息的内容联系到了一起;
2.生产者生产了若干条数据,消费者才可以取数据,或者是,消费者取完一次数据后,还没等生产者放入新的数据,又重复取出了已取过的数据。
通过分析我们可知:
第一个问题可以通过同步来解决,第二个问题就需要用到线程通信。生产者线程放入数据后,通知消费者线程取出数据,消费者线程取出数据后,通知生产者线程生产数据,这里用wait\notigy机制来实现。
package thread;
public class Info {
private String name = "name";
private String content = "content";
//设置标志位,用来进行线程通信
private boolean flag =true;
/**
* 设置消息,此处用到线程同步
* @param name
* @param content
*/
public synchronized void set(String name,String content)
{
while (!flag)
{
try {
super.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.name=name; //设置名称
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.content=content; //设置内容
flag =false; //设置标志位,表示现在生产停止,可以取走!
}
public synchronized void get()
{
while (flag)
{
try {
super.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name +
" --> " + content) ;
flag = true ; // 改变标志位,表示可以生产
super.notify();
}
}
public class Producer implements Runnable {
private Info info=null;
public Producer(Info info)
{
this.info=info;
}
@Override
public void run() {
boolean flag = true ; // 定义标记位
for(int i=0;i<10;i++){
if(flag){
this.info.set("姓名--1","内容--1") ; // 设置名称
flag = false ;
}else{
this.info.set("姓名--2","内容--2") ; // 设置名称
flag = true ;
}
}
}
}
public class Consumer implements Runnable {
private Info info = null ;
public Consumer(Info info){
this.info = info ;
}
public void run(){
for(int i=0;i<10;i++){
this.info.get() ;
}
}
public static void main(String[] args) {
Info info = new Info(); // 实例化Info对象
Producer pro = new Producer(info) ; // 生产者
Consumer con = new Consumer(info) ; // 消费者
new Thread(pro).start() ;
//启动了生产者线程后,再启动消费者线程
try{
Thread.sleep(500) ;
}catch(InterruptedException e){
e.printStackTrace() ;
}
new Thread(con).start() ;
}
}
BlockingQueue 任何有效的生产者-消费者问题解决方案都是通过控制生产者put()方法(生产资源)和消费者take()方法(消费资源)的调用来实现的,一旦你实现了对方法的阻塞控制,那么你将解决该问题。Java通过BlockingQueue提供了开箱即用的支持来控制这些方法的调用(一个线程创建资源,另一个消费资源)。java.util.concurrent包下的BlockingQueue接口是一个线程安全的可用于存取对象的队列。

BlockingQueue是一种数据结构,支持一个线程往里存资源,另一个线程从里取资源。这正是解决生产者消费者问题所需要的,那么让我们开始解决该问题吧。
public class InfoPlus {
private String name = "name";
private String content = "content";
public InfoPlus(String name, String content) {
this.name = name;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "InfoPlus{" +
"name=‘" + name + ‘\‘‘ +
", content=‘" + content + ‘\‘‘ +
‘}‘;
}
}
import java.util.concurrent.BlockingQueue;
public class ProducerPlus implements Runnable {
private BlockingQueue<InfoPlus> queue;
public ProducerPlus(BlockingQueue<InfoPlus> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i=0;i<10;i++)
{
try {
Thread.sleep(1000);
queue.put(new InfoPlus("name"+i,"content"+i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class ConsumerPlus implements Runnable{
private BlockingQueue<InfoPlus> queue;
public ConsumerPlus(BlockingQueue<InfoPlus> queue) {
this.queue = queue;
}
public void run() {
while (true) {
try {
System.out.println(this.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
BlockingQueue<InfoPlus> blockingQueue = new LinkedBlockingDeque<>();
ProducerPlus producerPlus = new ProducerPlus(blockingQueue);
ConsumerPlus consumerPlus = new ConsumerPlus(blockingQueue);
ConsumerPlus consumerPlus1 = new ConsumerPlus(blockingQueue);
new Thread(producerPlus).start();
new Thread(consumerPlus).start();
new Thread(consumerPlus1).start();
}
}
原文:https://www.cnblogs.com/MrSaver/p/9409838.html