Java CountDownLatch解析(上)

  • 写在前面的话

最近一直在边工作边学习分布式的东西,看到了构建Java中间件的基础知识,里面有提到Java多线程并发的工具类,例如ReentrantLock、CyclicBarrier、CountDownLatch...

以前在工作中也有用到过这些实用的工具类,但是了解不是特别深入,借此机会打个卡,好记性不如烂博客,哈哈哈...

  • CountDownLatch简介

CountDownLatch顾名思义,count + down + latch = 计数 + 减 + 门闩(这么拆分也是便于记忆=_=) 可以理解这个东西就是个计数器,只能减不能加,同时它还有个门闩的作用,当计数器不为0时,门闩是锁着的;当计数器减到0时,门闩就打开了。

如果你感到懵比的话,可以类比考生考试交卷,考生交一份试卷,计数器就减一。直到考生都交了试卷(计数器为0),监考老师(一个或多个)才能离开考场。至于考生是否做完试卷,监考老师并不关注。只要都交了试卷,他就可以做接下来的工作了。

  • CountDownLatch实用场景

既然知道了它的定义,那什么时候使用它呢?笔者能想到的场景是:

有任务A和任务B,任务B必须在任务A完成之后再做。而任务A还能被分为n部分,并且这n部分之间的任务互不影响。为了加快任务完成进度,把这n部分任务分给不同的线程,当A任务完成了,然后通知做B任务的线程接着完成任务,至于完成B任务的线程,可以是一个,也可以是多个。

上图:

  • CountDownLatch实现原理

接下来就跟笔者来扒一扒CountDownLatch的源码,它到底是怎么实现这个牛逼功能的。(源码版本JDK1.8)

public class CountDownLatch {

    // 内部类 继承AQS类
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    //AQS子类的实例对象
    private final Sync sync;

    // 有参构造器
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 等待
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 超时等待
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 计数减1
    public void countDown() {
        sync.releaseShared(1);
    }

    // 获取计数器当前计数
    public long getCount() {
        return sync.getCount();
    }

    // 吐司就不多说了吧
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

熟悉ReentrantLock的读者应该知道这种结构,它也是采用这种结构来完成功能的,只是ReentrantLock在Sync这个内部类下,它还分了NonfairSync(非公平锁)和FairSync(公平锁)这两个类来继承Sync这个父类。这种结构的好处在于我们不必关心AbstractQueuedSynchronizer的具体实现细节,只需要实现它要我们实现的方法(例如:tryAcquireShared tryReleaseShared)即可生成我们特定的并发业务工具类。

扯远了哈,接下来笔者准备分两条线来分析CountDownLatch,一是CountDownLatch.await()阻塞当前线程,二是CountDownLatch.countDown()当前线程把计数器减一

一、CountDownLatch.await()

猜想一下:

 提问:实现这个功能你能想到的方法有哪些?

 回答:第一 你可能会使用线程的join(),让当前线程等待join线程执行结束。其原理是不停检查join线程是否存活,如果join线程存活则当前线程永远等待。

    第二 你可能会使用线程间wait/notify,进入synchronized同步块或方法中,检查计数器值不为0,然后调用Object.wait();直到值为0则调用notifyAll()唤醒等待线程。

 分析:方法一 如果只有两三个线程还好,如果数量过多,那得写多少join啊...

    方法二 大量synchronized同步块,还可能存在假唤醒...

 结论:上面提到的方法或多或少都存在这样那样的弊端,那我们就猜想一下思路解决这些弊端

    其一 我们可能需要一个volatile变量来实时感知计数器的值,一旦计数器值为0则唤醒阻塞在该条件上的线程

    其二 因为volatile只有数据实时透明性,它并不能保证线程的顺序执行,所以我们可能需要一个同步队列来放置这些阻塞队列,当计数器值为0时,从队列中挨着一个个唤醒线程

下面开始我们的验证:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}

构造方法传入了一个int变量,而我们跟进去发现,这个int变量是AQS中的state,类型是volatile的,它就是用来表示计数器值的。由此证明我们的猜想。注意:count值需要大于等于0

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

当我们调用await()的方法后,会默认调用sync这个实例的acquireSharedInterruptibly这个方法,并且参数为1,需要注意的是,这个方法声明了一个InterruptedException异常,表示调用该方法的线程支持打断操作。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

我们跟进源码发现,acquireSharedInterruptibly这个方法是sync继承AQS而来的,这个方法的调用是响应线程的打断的,所以在前两行会检查线程是否被打断。接着调用tryAcquireShared()方法来判断返回值,根据值的大小决定是否执行doAcquireSharedInterruptibly()。

// AQS中的方法protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
// Sync中的方法
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
// AQS中的方法
protected final int getState() {
    return state;// state是volatile
}

我们看到AQS把这个方法留给子类去实现,在子类sync的tryAcquireShared中它只验证了计数器的值是否为0,如果为0则返回1,反之返回-1,根据上面可以看出,整数就不会执行doAcquireSharedInterruptibly(),该线程就结束方法,继续执行自己的代码去了。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);// 往同步队列中添加节点
    boolean failed = true;
    try {
        for (;;) {// 一个死循环 跳出循环只有下面两个途径
            final Node p = node.predecessor();// 当前线程的前一个节点
            if (p == head) {// 如果是首节点
                int r = tryAcquireShared(arg);// 这个是不是似曾相识 见上面
                if (r >= 0) {
                    setHeadAndPropagate(node, r);// 处理后续节点
                    p.next = null; // help GC 这个可以借鉴
                    failed = false;
                    return;// 计数值为0 并且为头节点 跳出循环
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();// 响应打断 跳出循环
        }
    } finally {
        if (failed)
            cancelAcquire(node);// 如果是打断退出的 则移除同步队列节点
    }
}

接着我们来看doAcquireSharedInterruptibly这个方法,因为计数器值不为0需要阻塞线程,所以在进入方法时,将该线程包装成节点并加入到同步队列尾部(如何添加源码稍后展示),我们看到这个方法退出去的途径直有两个,一个是return,一个是throw InterruptedException。注意最后的finally的处理。

return退出方法有两个条件,首先计数值为0,接着必须是同步节点首节点。

throw InterruptedException是响应打断操作的,线程在阻塞期间,如果你不想在等待了,可以打断线程让它继续运行后面的任务(注意异常处理)

接着我们看看添加节点的源码:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);// 包装节点
    Node pred = tail;// 同步队列尾节点
    if (pred != null) {// 同步队列有尾节点 将我们的节点通过cas方式添加到队列后面
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {// 以cas原子方式添加尾节点
            pred.next = node;
            return node;// 退出该方法
        }
    }
    enq(node);// 两种情况执行这个代码 1.队列尾节点为null 2.队列尾节点不为null,但是我们原子添加尾节点失败
    return node;
}

private Node enq(final Node node) {
    for (;;) {// 又是一个死循环
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))// cas形式添加头节点  注意 是头节点
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {// cas形式添加尾节点
                t.next = node;
                return t;// 结束这个方法的唯一出口 添加尾节点成功
            }
        }
    }
}

至此,CountDownLatch.await()阻塞当前线程的基本功能已经梳理出来了,CountDownLatch.countDown()计数器减一功能以及CountDownLatch示例和它的优缺点将留在下部分梳理。

然后,限于篇幅更多的compareAndSetHead()和compareAndSetTail()这些末节方法未详细列出,希望读者能自行查看api了解。

最后,由于笔者水平有限,难免有不足之处,有不对之处,请不吝惜指教。谢谢!

时间: 08-25

Java CountDownLatch解析(上)的相关文章

Java CountDownLatch解析(下)

写在前面的话 在上一篇CountDownLatch解析中,我们了解了CountDownLatch的简介.CountDownLatch实用场景.CountDownLatch实现原理中的await()方法, 接下来我们接着来了解它的countDown()方法以及它的示例和优缺点. CountDownLatch实现原理 二.CountDownLatch.countDown() 关于同步队列那点事 当部分线程调用await()方法后,它们在同步队列中被挂起,然后自省的检查自己能否满足醒来的条件(还记得那

JAVA实现文件上传

  代码如下: 还要两个jar包      前台页面: 01 <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%> 02 <% 03 String path = request.getContextPath(); 04 String basePath = request.getScheme()+"://"+request.ge

JAVA - Sql解析工具fdb-sql-parser简单使用

由于想要解决Mybatis分页插件中count查询效率问题,因为order by很影响效率,所以需要一种方式处理sql,将order by 语句去掉. 试了好几个sql解析工具,最后选择了fdb-sql-parser. Maven依赖: <dependency> <groupId>com.foundationdb</groupId> <artifactId>fdb-sql-parser</artifactId> <version>1.

html5+java大文件上传代码

<body> <input id="fileid" type="file" accept="video/*;capture=camera" onchange="onfile(this)"> <input id="btn" type="button" value="提交"> <script type="text/jav

org.w3c.dom(java dom)解析XML文档

位于org.w3c.dom操作XML会比较简单,就是将XML看做是一颗树,DOM就是对这颗树的一个数据结构的描述,但对大型XML文件效果可能会不理想 首先来了解点Java DOM 的 API:1.解析器工厂类:DocumentBuilderFactory 创建的方法:DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); 2.解析器:DocumentBuilder 创建方法:通过解析器工厂类来获得 DocumentBu

js上传文件带参数,并且,返回给前台文件路径,解析上传的xml文件,存储到数据库中

ajaxfileupload.js jQuery.extend({ createUploadIframe: function(id, uri) { //create frame var frameId = 'jUploadFrame' + id; if(window.ActiveXObject) { var io = document.createElement('<iframe id="' + frameId + '" name="' + frameId + '&qu

Java XML解析技术

XML现在已经成为一种通用的数据交换格式,它的平台无关性,语言无关性,系统无关性,给数据集成与交互带来了极大的方便.XML在不同的语言里解析方式都是一样的,只不过实现的语法不同而已.基本的解析方式有两种,一种叫SAX,另一种叫DOM. DOM的全称是Document Object Model,也即文档对象模型.在应用程序中,基于DOM的XML分析器将一个XML文档转换成一个对象模型的集合(通常称DOM树),应用程序正是通过对这个对象模型的操作,来实现对XML文档数据的操作.通过DOM接口,应用程

Java泛型解析(04):约束和局限性

Java泛型解析(04):约束和局限性 前两节,认识和学习了泛型的限定以及通配符,初学者可能需要一些时间去体会到泛型程序设计的好处和力量,特别是想成为库程序员的同学就需要下去体会通配符的运用了,应用程序员则需要掌握怎么使用泛型,这里针对泛型的使用中的约束和局限性做一个介绍性的讲解. 不能用基本类型实例化类型参数 这个很好理解,实例化泛型类时泛型参数不能使用基本类型,如List<int>这是不合法的,要存储基本类型则可以利用基本类型的包装类如List<Integer> .List&l

Java Sax解析xml

1.   Java Sax解析是按照xml文件的顺序一步一步的来解析,在解析xml文件之前,我们要先了解xml文件的节点的种类,一种是ElementNode,一种是TextNode.如下面的这段book.xml Xml代码   <?xml version="1.0" encoding="UTF-8"?> <books> <book id="12"> <name>thinking in java<