? ? ?最近在研究blockqueue的源码,从今天开始,和大家分享一下我看源码的一些心得体会
? ? ?(1)LinkedBlockingQueue源码解析
? ? ?(2)ArrayBlockingQueue源码解析
? ??
? ? ?LinkedBlockingQueue实现了BlockingQueue接口以及Serializable接口,是有序的FIFO队列,构造函数中,可传入一个最大容量值,如果没有传入,则默认是Integer.MAX_VALUE
? ? 一 首先看一下重要的几个类变量:
?
?
/** 保存当前队列中元素的个数 */
private final AtomicInteger count = new AtomicInteger(0);
/**
* 头元素
* Invariant: head.item == null
*/
private transient Node<E> head;
/**
* 尾元素
* Invariant: last.next == null
*/
private transient Node<E> last;
/** 消费者锁,Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** 使消费者线程等待,直到被唤醒或者打断 */
private final Condition notEmpty = takeLock.newCondition();
/** 生产者锁,Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** 使生产者线程等待,直到被唤醒或者打断 */
private final Condition notFull = putLock.newCondition();
?
二 put方法
?
?
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
? ?
执行过程如下:
? ?1 如果传入元素为空,抛出空指针异常
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
?public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
??
五 pool方法
?
?
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
?
执行过程如下:
?
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
??(4)take在队列为空时,会始终阻塞
BlockQueue之LinkedBlockingQueue源码解析
原文:http://wang7839186.iteye.com/blog/2294376