CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
来看一下这个类的构造方法,如下:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }构造函数需要提供一个count参数,表示一个计数器。此外就是两个用来控制锁存器的方法:countDown()和await()。
首先来看一下await()方法,如下:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit)throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }调用了acquireSharedInterruptibly()方法,这个方法的源代码如下:
public final void acquireSharedInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) // 如果线程被中断,则抛出异常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 如果tryAcquireShared()方法获取失败,则调用如下的方法 doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }在CountDownLatch类中定义了一个private volatile long类型的state变量,表示锁计数器。通过调用getState()获取这个锁计数器的值。如果为0,则返回1,表示成功,否则表示失败。继续调用如下的方法:
private void doAcquireSharedInterruptibly(long arg) throws InterruptedException { // 创建"当前线程"的Node节点,且Node中记录的锁是"共享锁"类型;并将该节点添加到CLH队列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取上一个节点。 // 如果上一节点是CLH队列的表头,则"尝试获取共享锁"。 final Node p = node.predecessor(); if (p == head) { long r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // (上一节点不是CLH队列的表头) 当前线程一直等待,直到获取到共享锁。 // 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
接下来看一下countDown()方法,如下:
public void countDown() { sync.releaseShared(1); }releaseShared()方法在AQS中实现,代码如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }首先调用tryReleaseShared()方法:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 获取锁计数器的状态 int c = getState(); if (c == 0) return false; // 锁计数器减去1 int nextc = c-1; // 通过CAS函数进行赋值 if (compareAndSetState(c, nextc)) return nextc == 0; } }
public class CountDownLatchTest1 { private static int LATCH_SIZE = 5; private static CountDownLatch doneSignal; public static void main(String[] args) { try { doneSignal = new CountDownLatch(LATCH_SIZE); // 新建5个任务 for(int i=0; i<LATCH_SIZE; i++) new InnerThread().start(); System.out.println("main await begin."); // "主线程等待线程池中5个任务的完成 doneSignal.await(); System.out.println("main await finished."); } catch (InterruptedException e) { e.printStackTrace(); } } static class InnerThread extends Thread{ public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " sleep 1000ms."); // 将CountDownLatch的数值减1 doneSignal.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } }运行的结果如下:
main await begin. Thread-0 sleep 1000ms. Thread-2 sleep 1000ms. Thread-1 sleep 1000ms. Thread-4 sleep 1000ms. Thread-3 sleep 1000ms. main await finished.
Java 7之多线程第12篇 - CountDownLatch
原文:http://blog.csdn.net/mazhimazh/article/details/19190241