阻塞队列是一个队列,它最大的特点就是阻塞的线程满足条件就会被自动唤醒,不需要我们人为的判断。

之前总结的线程间通信,需要判断对应的值,一个生产者与一个消费者,在判断状态的时候需要加一个标志类,还需要控制线程。而阻塞队列在某些情况会挂起<暂停>线程(阻塞),满足条件,就会被自动的唤起
java中阻塞队列的方法如下:

BlockQueue的源码:
public interface BlockingQueue<E> extends Queue<E> {
//增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
boolean add(E e);
//添加一个元素并返回true 如果队列已满,则返回false
boolean offer(E e);
//添加一个元素 如果队列满,则阻塞
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//移除并返回队列头部的元素 如果队列为空,则阻塞
E take() throws InterruptedException;
//移除并返问队列头部的元素 如果队列为空,则返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//剩余容量
int remainingCapacity();
//移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
boolean remove(Object o);
public boolean contains(Object o);
//一次性从BlockingQueue获取所有可用的数据对象并转移到参数集合中
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
可以看到,BlockQueue提供了很多不同于其他集合的方法。下面是它的子类:

我们随便选一个ArrayBlockQueue来探索一下它是怎么做到阻塞的。先看看它的三个构造方法:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化一个数组
this.items = new Object[capacity];
//重入锁
lock = new ReentrantLock(fair);
//下面初始化的是两个队列
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
我们关注的重点当然是第三个构造方法,此处用到了lock锁来把一个普通的集合转移到ArrayBlockQueue中。ArrayBlockQueue的初始化是在第二个构造方法中完成的。需要注意的是,ArrayBlockQueue内部存储对象的方式是通过Object数组实现的。
不难想象,构造方法就已经用lock锁来达到安全的目的了,那么,其他的阻塞相关方法也肯定离不开lock锁的影子了。我们带着这个flag继续往下走。先来看看offer()方法和put()方法,发现和我们猜想的一样:
该方法在ArrayBlockQueue中有两个重载方法offer(E e, long timeout, TimeUnit unit)和offer(E e)。
将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。前者与后者的主要区别在于,如果队列中没有可用空间,可以设置一定的等待时间,等待可用空间。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果长度等于数组长度表示已经满了
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
将指定的元素插入到队列的尾部,如果有可用空间直接插入,如果没有可用空间,调用condition.await()方法等待,直到被唤醒,然后插入元素。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//这种锁可以中断
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//可以跟进
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//此处putIndex可以当成游标
items[putIndex] = x;
//当数据满了,游标会恢复为0
if (++putIndex == items.length)
putIndex = 0;
//队列中元素个数
count++;
//唤醒
notEmpty.signal();
}
如果插入元素成功,返回true,如果插入失败抛出异常IllegalStateException(“Queue full”)。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
出队列方法:
该方法也有两个重载方法poll(long timeout, TimeUnit unit)和poll(),从队列头部移除一个元素,前者与后者的区别在于,如果队列中没有可以移除的元素,前者会等待一定时间,然后执行移除方法。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();//如果没有可以移出元素,返回null,否则执行dequeue()方法
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);//如果没有可以移出元素,调用condition的线程等待的方法,等待一定时间
}
return dequeue();
} finally {
lock.unlock();//最后释放锁lock
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//最后唤醒其他等待的线程
return x;
}
获取并移除此队列的头部。take()和poll()的区别在于,如果队列中没有可移除元素,take()会一直等待,而poll()可设置直接返回null或者等待一定时间。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();//如果队列中没有元素,该线程一直处于阻塞状态
return dequeue();
} finally {
lock.unlock();
}
}
分析完了上面的源码,我们以一个小Demo来结束上面的话题,我们以积分分发和消费为例来随便搞个例子
public class User {
private String name;
public User(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"name=‘" + name + ‘\‘‘ +
‘}‘;
}
}
public class UserService {
private final ExecutorService executorService= Executors.newSingleThreadExecutor();
ArrayBlockingQueue<User> arrayBlockingQueue=new ArrayBlockingQueue(10);
{
init();
}
public void init(){ //不断消费队列的线程
executorService.execute(()->{
while(true){
try {
User user=arrayBlockingQueue.take(); //阻塞式
System.out.println("发送优惠券给:"+user);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public boolean register(){
User user=new User("用户A");
addUser(user);
//发送积分.
try {
arrayBlockingQueue.put(user);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
private void addUser(User user){
System.out.println("添加用户:"+user);
}
public static void main(String[] args) {
new UserService().register();
}
}
原文:https://www.cnblogs.com/xing1/p/13772013.html