上一篇博客,我们介绍了ArrayBlockQueue,知道了它是基于数组实现的有界阻塞队列,既然有基于数组实现的,那么一定有基于链表实现的队列了,没错,当然有,这就是我们今天的主角:LinkedBlockingQueue。ArrayBlockQueue是有界的,那么LinkedBlockingQueue是有界还是无界的呢?我觉得可以说是有界的,也可以说是无界的,为什么这么说呢?看下去你就知道了。
和上篇博客一样,我们还是先看下LinkedBlockingQueue的基本应用,然后解析LinkedBlockingQueue的核心代码。
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue();
linkedBlockingQueue.add(15);
linkedBlockingQueue.add(60);
linkedBlockingQueue.offer(50);
linkedBlockingQueue.put(100);
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.size());
System.out.println(linkedBlockingQueue.take());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.poll());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.peek());
System.out.println(linkedBlockingQueue);
System.out.println(linkedBlockingQueue.remove(50));
System.out.println(linkedBlockingQueue);
}
运行结果:
[15, 60, 50, 100]
4
15
[60, 50, 100]
60
[50, 100]
50
[50, 100]
true
[100]
代码比较简单,先试着分析下:
代码比较简单,但是还是有些细节不明白:
要解决上面的疑问,最好的途径还是看源码,下面我们就来看看LinkedBlockingQueue的核心源码。
LinkedBlockingQueue提供了三个构造方法,如下图所示:
我们一个一个来分析。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
无参的构造方法竟然直接把“锅”甩出去了,甩给了另外一个构造方法,但是我们要注意传的参数:Integer.MAX_VALUE。
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
这个capacity是什么呢?如果大家对代码有一定的感觉的话,应该很容易猜到这是LinkedBlockingQueue的最大容量。如果我们调用无参的构造方法来创建LinkedBlockingQueue的话,那么它的最大容量就是Integer.MAX_VALUE,我们把它称为“无界”,但是我们也可以指定最大容量,那么此队列又是一个“有界”队列了,所以有些博客很草率的说LinkedBlockingQueue是有界队列,或者是无界队列,个人认为这是不严谨的。
我们再来看看这个Node是个什么鬼:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
是不是有一种莫名的亲切感,很明显,这是单向链表的实现呀,next指向的就是下一个Node。
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);//调用第二个构造方法,传入的capacity是Int的最大值,可以说 是一个无界队列。
final ReentrantLock putLock = this.putLock;
putLock.lock(); //开启排他锁
try {
int n = 0;//用于记录LinkedBlockingQueue的size
//循环传入的c集合
for (E e : c) {
if (e == null)//如果e==null,则抛出空指针异常
throw new NullPointerException();
if (n == capacity)//如果n==capacity,说明到了最大的容量,则抛出“Queue full”异常
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));//入队操作
++n;//n自增
}
count.set(n);//设置count
} finally {
putLock.unlock();//释放排他锁
}
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();//如果传入的元素为NULL,抛出异常
final AtomicInteger count = this.count;//取出count
if (count.get() == capacity)//如果count==capacity,说明到了最大容量,直接返回false
return false;
int c = -1;//表示size
Node<E> node = new Node<E>(e);//新建Node节点
final ReentrantLock putLock = this.putLock;
putLock.lock();//开启排他锁
try {
if (count.get() < capacity) {//如果count<capacity,说明还没有达到最大容量
enqueue(node);//入队操作
c = count.getAndIncrement();//获得count,赋值给c后完成自增操作
if (c + 1 < capacity)//如果c+1 <capacity,说明还有剩余的空间,唤醒因为调用notFull的await方法而被阻塞的线程
notFull.signal();
}
} finally {
putLock.unlock();//在finally中释放排他锁
}
if (c == 0)//如果c==0,说明释放putLock的时候,队列中有一个元素,则调用signalNotEmpty
signalNotEmpty();
return c >= 0;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
代码比较简单,就是开启排他锁,唤醒因为调用notEmpty的await方法而被阻塞的线程,但是这里需要注意,这里获得的排他锁已经不再是putLock,而是takeLock。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
add方法直接调用了offer方法,但是add和offer还不完全一样,当队列满了,如果调用offer方法,会直接返回false,但是调用add方法,会抛出"Queue full"的异常。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();//如果传入的元素为NULL,抛出异常
int c = -1;//表示size
Node<E> node = new Node<E>(e);//新建Node节点
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;//获得count
putLock.lockInterruptibly();//开启排他锁
try {
//如果到了最大容量,调用notFull的await方法,等待唤醒,用while循环,是为了防止虚假唤醒
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);//入队
c = count.getAndIncrement();//count先赋值给c后,再进行自增操作
if (c + 1 < capacity)//如果c+1<capacity,调用notFull的signal方法,唤醒因为调用notFull的await方法而被阻塞的线程
notFull.signal();
} finally {
putLock.unlock();//释放排他锁
}
if (c == 0)//如果队列中有一个元素,唤醒因为调用notEmpty的await方法而被阻塞的线程
signalNotEmpty();
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
入队操作是不是特别简单,就是把传入的Node节点,赋值给last节点的next字段,再赋值给last字段,从而形成一个单向链表。
至此offer/add/put的核心源码已经分析完毕,我们来做一个小总结,offer/add/put都是添加元素的方法,不过他们之间还是有所区别的,当队列满了,调用以上三个方法会出现不同的情况:
public int size() {
return count.get();
}
没什么好说的,count记录着LinkedBlockingQueue的size,获得后返回就是了。
public E take() throws InterruptedException {
E x;
int c = -1;//size
final AtomicInteger count = this.count;//获得count
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//开启排他锁
try {
while (count.get() == 0) {//说明目前队列中没有数据
notEmpty.await();//阻塞,等待唤醒
}
x = dequeue();//出队
c = count.getAndDecrement();//先赋值,后自减
if (c > 1)//如果size>1,说明在出队之前,队列中有至少两个元素
notEmpty.signal();//唤醒因为调用notEmpty的await方法而被阻塞的线程
} finally {
takeLock.unlock();//释放排他锁
}
if (c == capacity)//如果队列中还有一个剩余空间
signalNotFull();
return x;
}
我们再来看下signalNotFull方法:
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
相比take方法,最大的区别就如果队列为空,执行take方法会阻塞当前线程,直到被唤醒,而poll方法,直接返回null。
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
peek方法,只是拿到头节点的值,但是不会移除该节点。
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
没什么好说的,就是弹出元素,并且移除弹出的元素。
至此take/poll/peek的核心源码已经分析完毕,我们来做一个小总结,take/poll/peek都是获得头节点值的方法,不过他们之间还是有所区别的:
LinkedBlockingQueue的核心源码分析到这里完毕了,谢谢大家。
原文:https://www.cnblogs.com/CodeBear/p/10714924.html