import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者
*/
public class ProducerThread implements Runnable {
private BlockingQueue queue;
private volatile boolean flag = true;
private static AtomicInteger count = new AtomicInteger();
public ProducerThread(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println("生产线程启动...");
while (flag){
System.out.println("正在生产...");
String data = count.incrementAndGet() + "";
boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);
if (offer){
System.out.println("生产者存入"+data+"到队列成功");
}else {
System.out.println("生产者存入"+data+"到队列失败");
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("生产结束");
}
}
public void stop(){
this.flag = false;
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 消费者
*/
public class ConsumerThread implements Runnable{
private BlockingQueue<String> queue;
private volatile boolean flag = true;
public ConsumerThread(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println("消费线程启动...");
while (flag){
System.out.println("消费者正在获取数据...");
String data = queue.poll(2, TimeUnit.SECONDS);
if (data!=null){
System.out.println("消费者拿到队列中的数据:"+data);
Thread.sleep(1000);
}else {
System.out.println("消费者未拿到队列中的数据");
flag = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("消费者结束");
}
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,
* 如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。
* 和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部
*/
public class Main {
public static void main(String[] args) throws InterruptedException{
BlockingQueue<String> queue = new LinkedBlockingDeque<>(10);
ProducerThread producerThread1 = new ProducerThread(queue);
ProducerThread producerThread2 = new ProducerThread(queue);
ConsumerThread consumerThread = new ConsumerThread(queue);
Thread t1 = new Thread(producerThread1);
Thread t2 = new Thread(producerThread2);
Thread t3 = new Thread(consumerThread);
t1.start();
t2.start();
t3.start();
Thread.sleep(10000);
producerThread1.stop();
producerThread2.stop();
}
//生产线程启动...
//正在生产...
//消费线程启动...
//消费者正在获取数据...
//生产线程启动...
//正在生产...
//生产者存入1到队列成功
//生产者存入2到队列成功
//消费者拿到队列中的数据:2
//正在生产...
//生产者存入3到队列成功
//正在生产...
//生产者存入4到队列成功
//消费者正在获取数据...
//消费者拿到队列中的数据:1
//正在生产...
//消费者正在获取数据...
//正在生产...
//消费者拿到队列中的数据:3
//生产者存入5到队列成功
//生产者存入6到队列成功
//消费者正在获取数据...
//正在生产...
//正在生产...
//生产者存入8到队列成功
//生产者存入7到队列成功
//消费者拿到队列中的数据:4
//正在生产...
//消费者正在获取数据...
//正在生产...
//消费者拿到队列中的数据:5
//生产者存入9到队列成功
//生产者存入10到队列成功
//消费者正在获取数据...
//消费者拿到队列中的数据:6
//正在生产...
//正在生产...
//生产者存入11到队列成功
//生产者存入12到队列成功
//消费者正在获取数据...
//正在生产...
//正在生产...
//生产者存入13到队列成功
//消费者拿到队列中的数据:7
//生产者存入14到队列成功
//正在生产...
//生产者存入15到队列成功
//正在生产...
//消费者正在获取数据...
//生产者存入16到队列成功
//消费者拿到队列中的数据:8
//正在生产...
//消费者正在获取数据...
//正在生产...
//消费者拿到队列中的数据:9
//生产者存入17到队列成功
//生产者存入18到队列成功
//消费者正在获取数据...
//正在生产...
//生产者存入19到队列成功
//正在生产...
//消费者拿到队列中的数据:10
//生产者存入20到队列成功
//消费者正在获取数据...
//生产结束
//生产结束
//消费者拿到队列中的数据:11
//消费者正在获取数据...
//消费者拿到队列中的数据:12
//消费者正在获取数据...
//消费者拿到队列中的数据:13
//消费者正在获取数据...
//消费者拿到队列中的数据:14
//消费者正在获取数据...
//消费者拿到队列中的数据:15
//消费者正在获取数据...
//消费者拿到队列中的数据:16
//消费者正在获取数据...
//消费者拿到队列中的数据:17
//消费者正在获取数据...
//消费者拿到队列中的数据:18
//消费者正在获取数据...
//消费者拿到队列中的数据:19
//消费者正在获取数据...
//消费者拿到队列中的数据:20
//消费者正在获取数据...
//消费者未拿到队列中的数据
//消费者结束
}
21.使用LinkedBlockingDeque模拟生产者与消费者
原文:https://www.cnblogs.com/fly-book/p/11451562.html