首页 > 编程语言 > 详细

深入浅出Java并发包—CountDownLauch原理分析

时间:2021-01-16 23:01:31      阅读:35      评论:0      收藏:0      [点我收藏+]
深入浅出Java并发包—CountDownLauch原理分析

一线天色天宇星辰 IT哈哈
CountDownLauch是Java并发包中的一个同步工具集,常被人们称之为并发中的计数器,还有一种被成为闭锁!
CountDownLauch主要使用在两种场景,一种被称为开关,它允许一个任务完成之前,一个或一组线程持续等待。此种情况经常被称之为闭锁,通俗的讲就是,相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开,所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。另一种场景经常被称之为计数器,它允许将一个任务拆分为N个小任务,主线程在所有任务完成之前一直等待,每个任务完成时将计数器减一,直到所有任务完成后取消主线程的阻塞。
我们来看一下对应CountDownLauch对应的API。
技术分享图片
技术分享图片

CountDownLatch维护了一个正数计数器,countDown方法对计数器做减操作,await方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。
我们分别来看一下对应的一个应用实例:


package com.yhj.lauth;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
//工人
class Worker extends Thread{
    privateintworkNo;//工号
    private CountDownLatch startLauch;//启动器-闭锁
    private CountDownLatch workLauch;//工作进程-计数器
    public Worker(int workNo,CountDownLatch startLauch,CountDownLatch workLauch) {
       this.workNo = workNo;
       this.startLauch = startLauch;
       this.workLauch = workLauch;
    }
    @Override
    publicvoid run() {
       try {
           System.out.println(new Date()+" - YHJ"+workNo+" 准备就绪!准备开工!");
           startLauch.await();//等待老板发指令
           System.out.println(new Date()+" - YHJ"+workNo+" 正在干活...");
           Thread.sleep(100);//每人花100ms干活
       } catch (InterruptedException e) {
           e.printStackTrace();
       }finally{
           System.out.println(new Date()+" - YHJ"+workNo+" 工作完成!");
           workLauch.countDown();
       }
    }
}
//测试用例
publicclass CountDownLauthTestCase {

    publicstaticvoid main(String[] args) throws InterruptedException {
       int workerCount = 10;//工人数目
       CountDownLatch startLauch = new CountDownLatch(1);//闭锁相当于开关
       CountDownLatch workLauch = new CountDownLatch(workerCount);//计数器
       System.out.println(new Date()+" - Boss:集合准备开工了!");
       for(int i=0;i<workerCount;++i){
           new Worker(i, startLauch, workLauch).start();
       }
       System.out.println(new Date()+" - Boss:休息2s后开工!");
       Thread.sleep(2000);
       System.out.println(new Date()+" - Boss:开工!");
       startLauch.countDown();//打开开关
       workLauch.await();//任务完成后通知Boss
       System.out.println(new Date()+" - Boss:不错!任务都完成了!收工回家!");
    }
}
执行结果:
Sat Jun 08 18:59:33 CST 2013 - Boss:集合准备开工了!
Sat Jun 08 18:59:33 CST 2013 - YHJ0 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ2 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ1 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ4 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - Boss:休息2s后开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ8 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ6 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ3 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ7 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ5 准备就绪!准备开工!
Sat Jun 08 18:59:33 CST 2013 - YHJ9 准备就绪!准备开工!
Sat Jun 08 18:59:35 CST 2013 - Boss:开工!
Sat Jun 08 18:59:35 CST 2013 - YHJ0 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ2 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ1 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ4 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ8 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ6 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ3 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ7 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ5 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ9 正在干活...
Sat Jun 08 18:59:35 CST 2013 - YHJ5 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ1 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ3 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ6 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ7 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ9 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ4 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ0 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ2 工作完成!
Sat Jun 08 18:59:35 CST 2013 - YHJ8 工作完成!
Sat Jun 08 18:59:35 CST 2013 - Boss:不错!任务都完成了!收工回家!

这个示例里面使用了两个CountDownLauch,分别构建了两种场景,第一个startLauch相当于开关,在开启之前,没有任何一个线程执行,当开启之后,所有线程同时可以执行。第二个workerLauch其实就是一个计数器,当计数器没有减到零的时候,主线程一直等待,当所有线程执行完毕后,主线程取消阻塞继续执行!
第二种场景在我们后面要学习的线程池中经常会用到,我们后续再讨论!
此处还有一个重要的特性,就是
内存一致性效果:线程中调用 countDown() 之前的操作happen-before紧跟在从另一个线程中对应 await() 成功返回的操作。
场景应用我们是看到了,那它到底是基于什么原理,怎么实现的呢?
我们来看下对应的源码:


privatestaticfinalclass Sync extends AbstractQueuedSynchronizer

类的第二行我们就看到了其内部实现了AQS的一个同步器。我们重点来看下我们用到的几个方法:await和countDown。首先来看await方法


publicvoid await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

很明显是直接调用内部重新实现的同步器的获取共享锁的方法(前面我们一直再讲独占锁,今天我们借此机会把共享锁的机制一起讲掉)。


publicfinalvoid acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            thrownew InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

此处如果线程中断,则直接退出,否则尝试获取共享锁,我们来看下tryAcquireShared(arg)的实现(此方法由内部类重写实现):


publicint tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

所谓共享锁是说所有共享锁的线程共享同一个资源,一旦任意一个线程拿到共享资源,那么所有线程就都拥有的同一份资源。也就是通常情况下共享锁只是一个标志,所有线程都等待这个标识是否满足,一旦满足所有线程都被激活(相当于所有线程都拿到锁一样)。这里的闭锁CountDownLatch就是基于共享锁的实现。和明显这里的标识就是state等不等于零,而state其实是有多少个线程在竞争这份资源,我们前面可以看到是通过构造函数传入的一个大于0的数据,因此此时此刻此处返回的永远是-1。


Sync(int count) {
            setState(count);
        }

当tryAcquireShared返回的数据小于零,说明没有获取到资源,需要阻塞,此时执行代码doAcquireSharedInterruptibly():


privatevoid doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        thrownew InterruptedException();
    }

这里首先以共享模式添加一个节点加入到CLH队列中去,然后检查当前节点的前继节点(插入的数据在队尾),如果前继节点是头结点并且当前的计数器为0的话,则唤醒后继节点(唤醒后面来讲),否则判断是否需要阻塞,如果需要,则阻塞当前线程!直到被唤醒或被中断!


privatefinalboolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

这里注意一点,LockSupport.park(Obj)中的参数obj是阻塞的监视对象,而非阻塞的对象,阻塞的对象是当前操作的线程,所以unpack的时候也是应该结算对应的线程!不要搞混了哈!


publicstaticvoid park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
}
publicstaticvoid unpark(Thread thread) {
        if (thread != null)
            unsafe.unpark(thread);
    }

下面我们来看一下对应的countDown方法的实现


publicvoid countDown() {
        sync.releaseShared(1);
    }

首先每执行一次countDown就会执行内部方法的一次释放锁的操作!


publicfinalboolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            returntrue;
        }
        returnfalse;
    }

如果尝试成功则设置当前节点为头结点,并唤醒对应节点的后继节点!


publicboolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    returnfalse;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

同样,释放锁的方法也是CountDownLauch内部的同步类自己实现,这个方法自旋检测当前计数器的数目,如果等于零,说明之前阻塞的线程已经全部释放了,直接返回false,否则CAS设置当前的计数器,减去countdown的数目,如果设置成功后的数据为零的话,说明已经全部执行完毕,需要释放阻塞的线程了,返回true(注意此处精妙的返回nextc == 0),否则返回false。
我们再来回看releaseShared方法,当tryReleaseShared返回true的时候,说明计数器已经为零,阻塞的资源需要释放了!此时执行unparkSuccessor(h)方法唤醒队列中的头结点。
此处设计了一个精妙的队列依次去释放被阻塞的线程,而不是类似singleAll的方法直接唤醒所有线程。那到底它是怎么实现的呢?我们代码上看只唤醒了头结点(其实是头结点的后继节点,头结点只是一个空节点),我们先来看下unparkSuccessor的实现


privatevoid unparkSuccessor(Node node) {
        /*
         * Try to clear status in anticipation of signalling.  It is
         * OK if this fails or if status is changed by waiting thread.
         */
        compareAndSetWaitStatus(node, Node.SIGNAL, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

明显我们可以看到,传入的参数为头结点,通过CAS设置数据后,唤醒了头结点的后继结点(注意unpack的是线程而不是阻塞监视器)。然后就返回了!
那剩余阻塞的线程是怎么唤醒的呢?我们再来看下await方法中doAcquireSharedInterruptibly的实现


privatevoid doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg); // tag 2
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // tag 3
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())// tag 1
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        thrownew InterruptedException();
    }

前面我们可以看到在执行parkAndCheckInterrupt()时进行了阻塞,当我们唤醒头结点的后继节点(第一个进入队列的节点)时,tag1此行代码被唤醒,break之后继续进入自旋,而此时tag2行代码检测到计数器已经为0,因此tryAcquireShared(arg)返回的结果是1(之前返回的都是-1),r大于零,进入tag3代码,tag3会把当前的线程设置为头结点,然后继续唤醒后续的后继节点。


privatevoid setHeadAndPropagate(Node node, int propagate) {
        setHead(node); // tag 4
        if (propagate > 0 && node.waitStatus != 0) {
            /*
             * Don‘t bother fully figuring out successor.  If it
             * looks null, call unparkSuccessor anyway to be safe.
             */
            Node s = node.next;
            if (s == null || s.isShared())
                unparkSuccessor(node); // tag 5
        }
    }

后继节点被唤醒后,则继续唤醒后面的后继节点,进而把队列中的数据依次唤醒!
整个CountDownLatch就是这个样子的。其实有了前面原子操作和AQS的原理及实现,分析CountDownLatch还是比较容易的。

深入浅出Java并发包—CountDownLauch原理分析

原文:https://blog.51cto.com/15061944/2593716

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!