首先建立WorkshopService类,它提供线程安全的put操作和take操作:
public class WorkshopService { private int buffer[]=null; //反映缓冲区当前的大小 private int size=0; private static final Lock lock = new ReentrantLock(); private static final Condition ProCondition = lock.newCondition();//用于锁定生产者 private static final Condition ConCondition = lock.newCondition();//用于锁定消费者 public WorkshopService(int capacity) { this.buffer=new int[capacity]; } //同步方法,同一时刻只允许有一个生产者或消费者线程进入该方法 public void put(int elem) throws InterruptedException { lock.lock(); /* 若没有空位,则等待 此处必须写成while,不能是if;如果是if,线程被唤醒并且获得锁之后 会直接执行后面的语句,可能发生数组越界异常
注1 */ while(size>=buffer.length) ProCondition.await(); buffer[size]=elem; size++; //唤醒一个消费者线程 ConCondition.signal(); lock.unlock(); } public int take() throws InterruptedException{ lock.lock(); //若没有产品则等待 while(size<=0) ConCondition.await(); int temp=buffer[size-1]; size--; //唤醒一个生产者线程 ProCondition.signal(); lock.unlock(); return temp; } }
然后建立Producer类,它继承Thread类。这个线程类模拟生产者生产产品,并将产品送到公共的缓冲区。
public class Producer extends Thread{ private WorkshopService service=null; private static final Random r = new Random(); public Producer(WorkshopService service) { this.service=service; } private int Producing() throws InterruptedException { //模拟生产活动 System.out.println("好累啊"); Thread.currentThread().sleep(1000); return r.nextInt(1000); } @Override public void run() { // TODO Auto-generated method stub try { int product=Producing(); service.put(product); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
然后建立Consumer类,它也继承Thread类。这个线程类模拟消费者从公共的缓冲区取产品,并消费。
public class Consumer extends Thread{ WorkshopService service=null; public Consumer(WorkshopService service) { // TODO Auto-generated constructor stub this.service=service; } private void Consuming(int product) throws InterruptedException { System.out.println("垃圾产品"); Thread.currentThread().sleep(1000); } @Override public void run() { // TODO Auto-generated method stub try { int product=service.take(); Consuming(product); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
最后再建立测试类Main,产生10个生产者和10个消费者线程,并启动运行:
public class Main { private static final int num = 10; public static void main(String[] args) { // TODO Auto-generated method stub WorkshopService service=new WorkshopService(30); Producer p[] = new Producer[num]; Consumer c[] = new Consumer[num]; for(int i=0;i<num;i++) { p[i]=new Producer(service); c[i]=new Consumer(service); p[i].start(); c[i].start(); } } }
最后运行的结果:
如果将num跳到10000以上,会交替打印生产信息和消费信息,并且不会出现其他异常。
注1:
设想有两个生产者线程,其中一个生产者A率先获得lock,但是发现size==buffer.length,于是陷入阻塞。另外一个消费者线程执行了take操作,将size减一,并将生产者A唤醒。但是,另一个生产者B抢先一步获得了lock,执行了put操作,并将size加一。生产者A获得锁之后从上次被阻塞的地方重新开始执行。
如果是if,那么它将不检查条件size<=length,直接执行后面的代码,便会发生数组越界的异常。
所有要写成while.
原文:https://www.cnblogs.com/chxyshaodiao/p/12317647.html