LinkedBlockingQueue是BlockingQueue中的其中一个,其实现方式为单向链表,下面看其具体实现。(均为JDK8)
在LinkedBlockingQueue中有三个构造函数,如下图,
这是一个无参的构造函数,其定义如下,
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
在这个构造函数中调用了有参的构造函数,传入的整型值为Integer所能表示的最大值(0x7fffffff)。下面看这个带参数的构造方法。
这是一个整型参数的构造方法其,定义如下,
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
从其实现上来看,其整型参数代表此队列的容量,即元素的最大个数;然后初始化了last和head两个变量,我们猜last应该是此队列的尾元素、head为此队列的头元素。这里有一个Node类,下面看Node类的实现,
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
Node是LinkedBlockingQueue的静态内部类,也是组成此队列的节点元素,其内部有两个属性,一个Item代表节点元素,next指向下一个Node节点,这样就可以得出LinkedBlockingQueue的结构是使用Node连接起来,头节点为head,尾节点为last。
此构造方法是使用一个集合类进行构造,其定义如下
public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
从上面的构造函数中可以大体了解其属性有容量(capacity),队列中的元素数量(count)、头节点(head)、尾节点(last)等,如下
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
使用原子类AtomicInteger的count表示队列中元素的个数,可以很好的处理并发,AtomicInteger底层大都是使用乐观锁进行操作,多线程下是安全的。
关注下takeLock、putLock这两个属性,这里定义了两把锁,一把take锁,另一把put锁。通过两把可重入锁实现并发,与ArrayBlockingQueue的一把锁相比。
需要使用阻塞队列,那么就需要向队列中添加或取出元素,在LinkedBlockingQueue中已经实现了相关操作,对于添加/取出均是成对出现,提供的方法中有抛出异常、返回false、线程阻塞等几种情形。
put/take是一对互斥操作,put向队列中添加元素,take从队列中取出元素。
put方法定义如下,
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); //获得添加锁, final ReentrantLock putLock = this.putLock; //获得当前队列中的元素数量 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //如果当前队列中元素的数量等于队列的容量,则阻塞当前线程, while (count.get() == capacity) { notFull.await(); } enqueue(node); //当前线程中元素数量增1,返回操作前的数量 c = count.getAndIncrement(); //c+1其实是当前队列中元素的数量,如果比容量小,则唤醒notFull的操作,即可以进行继续添加,执行put等添加操作。 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); }
//说明在执行enqueue前的数量为0,执行完enqueue后数量为1,则需要唤醒取进程。 if (c == 0) signalNotEmpty(); }
此方法的执行步骤大体如下,
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
take方法和put刚好相反,其定义如下,
public E take() throws InterruptedException { E x; int c = -1; //获得当前队列的元素数量 final AtomicInteger count = this.count; //获得take锁 final ReentrantLock takeLock = this.takeLock; //执行take操作 takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await();//阻塞当前线程 } x = dequeue(); //当前队列的数量减1,返回操作前的数量 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); }
//当前队列中元素数量为capacity-1,即未满,可以调用put方法,需要唤醒阻塞在put锁上的线程 if (c == capacity) signalNotFull(); return x; }
此方法的执行步骤大体如下,
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head;//head赋值给h Node<E> first = h.next;//相当于第二个节点赋值给first h.next = h; // help GC head = first;//头节点指向第二个节点 E x = first.item; first.item = null; return x; }
从上面的代码可以看出把头节点进行出队,即head指向下一个节点
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try {
//唤醒阻塞在put锁的其他线程 notFull.signal(); } finally { putLock.unlock(); } }
offer方法的定义如下,
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
此方法的执行步骤大体如下,
poll方法定义如下,
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
此方法的执行步骤大体如下,
在规定的时间内阻塞线程,其定义如下
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
此方法的执行步骤大体如下,
其方法定义如下,
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
此方法的执行步骤大体如下,
序号 | 方法名 | 队列满时处理方式 | 方法返回值 |
1 | offer(E e) | 返回false | boolean |
2 | put(E e) | 线程阻塞,直到中断或被唤醒 | void |
3 | offer(E e, long timeout, TimeUnit unit) | 在规定时间内重试,超过规定时间返回false | boolean |
序号 | 方法名 | 队列空时处理方式 | 方法返回值 |
1 | poll() | 返回null | E |
2 | take() | 线程阻塞,直到中断或被唤醒 | E |
3 | poll(long timeout, TimeUnit unit) | 在规定时间内重试,超过规定时间返回null | E |
以上是关于LinkedBlockingQueue队列的相关实现及方法介绍,此队列使用单向链表为载体,配合put/take锁实现生产线程和消费线程共享数据。LinkedBlockingQueue作为共享的数据池,实现并发环境下的添加及取出方法。
有不正之处欢迎指正,感谢!
原文:https://www.cnblogs.com/teach/p/10665947.html