java分析CountDownLatch的实现原理

编程

package com.edison;  

import java.util.Random;

import java.util.Scanner;

import java.util.concurrent.CountDownLatch;

/**

* @ClassName: Main32

* @description:

* @author: edison_Kwok

* @Date: create in 2020/2/20 20:28

* @Version: 1.0

*/public class Main33 {

public static void main(String[] args) {

startTestCountDownLatch();

}

private static void startTestCountDownLatch() {

int threadNum = 10;

final CountDownLatch countDownLatch = new CountDownLatch(threadNum);

for (int i = 0; i < threadNum; i++) {

final int finalI = i + 1;

new Thread(() -> {

System.out.println("thread " + finalI + " start");

Random random = new Random();

try {

Thread.sleep(random.nextInt(10000) + 1000);

} catch (InterruptedException e) {

e.printStackTrace(); } System.out.println("thread " + finalI + " finish");

countDownLatch.countDown(); }).start();

}

try {

System.out.println("我快结束了");

countDownLatch.await(); //上面开启了10个线程进行执行

//该await()方法就会卡在这里,等到上面的线程执行完成,才能够继续执行下面

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(threadNum + " thread finish");

}

}

主线程启动10个子线程后阻塞在await方法,需要等子线程都执行完毕,主线程才能唤醒继续执行。

构造器

CountDownLatch和ReentrantLock一样,内部使用Sync继承AQS。构造函数很简单地传递计数值给Sync,并且设置了state。

Sync(int count) {

setState(count);

}

AQS的state,这是一个由子类决定含义的“状态”。对于ReentrantLock来说,state是线程获取锁的次数;对于CountDownLatch来说,则表示计数值的大小。

阻塞线程

接着来看await方法,直接调用了AQS的acquireSharedInterruptibly。

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

首先尝试获取共享锁,实现方式和独占锁类似,由CountDownLatch实现判断逻辑。

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

返回1代表获取成功,返回-1代表获取失败。如果获取失败,需要调用doAcquireSharedInterruptibly:

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

doAcquireSharedInterruptibly的逻辑和独占功能的acquireQueued基本相同,阻塞线程的过程是一样的。不同之处:

  1. 创建的Node是定义成共享的(Node.SHARED);
  2. 被唤醒后重新尝试获取锁,不只设置自己为head,还需要通知其他等待的线程。(重点看后文释放操作里的setHeadAndPropagate)

释放操作

public void countDown() {

sync.releaseShared(1);

}

countDown操作实际就是释放锁的操作,每调用一次,计数值减少1:

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

同样是首先尝试释放锁,具体实现在CountDownLatch中:

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

死循环加上cas的方式保证state的减1操作,当计数值等于0,代表所有子线程都执行完毕,被await阻塞的线程可以唤醒了,下一步调用doReleaseShared:

private void doReleaseShared() {

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

//1

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

//2

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

标记1里,头节点状态如果SIGNAL,则状态重置为0,并调用unparkSuccessor唤醒下个节点。

标记2里,被唤醒的节点状态会重置成0,在下一次循环中被设置成PROPAGATE状态,代表状态要向后传播。

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

在唤醒线程的操作里,分成三步:

  • 处理当前节点:非CANCELLED状态重置为0;
  • 寻找下个节点:如果是CANCELLED状态,说明节点中途溜了,从队列尾开始寻找排在最前还在等着的节点
  • 唤醒:利用LockSupport.unpark唤醒下个节点里的线程。

线程是在doAcquireSharedInterruptibly里被阻塞的,唤醒后调用到setHeadAndPropagate。

private void setHeadAndPropagate(Node node, int propagate) {

Node h = head;

setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0 ||

(h = head) == null || h.waitStatus < 0) {

Node s = node.next;

if (s == null || s.isShared())

doReleaseShared();

}

}

setHead设置头节点后,再判断一堆条件,取出下一个节点,如果也是共享类型,进行doReleaseShared释放操作。下个节点被唤醒后,重复上面的步骤,达到共享状态向后传播。

要注意,await操作看着好像是独占操作,但它可以在多个线程中调用。当计数值等于0的时候,调用await的线程都需要知道,所以使用共享锁。

限定时间的await

CountDownLatch的await方法还有个限定阻塞时间的版本.

public boolean await(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

跟踪代码,最后来看doAcquireSharedNanos方法,和上文介绍的doAcquireShared逻辑基本一样,不同之处是加了time字眼的处理。

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)

throws InterruptedException {

if (nanosTimeout <= 0L)

return false;

final long deadline = System.nanoTime() + nanosTimeout;

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return true;

}

}

nanosTimeout = deadline - System.nanoTime();

if (nanosTimeout <= 0L)

return false;

if (shouldParkAfterFailedAcquire(p, node) &&

nanosTimeout > spinForTimeoutThreshold)

LockSupport.parkNanos(this, nanosTimeout);

if (Thread.interrupted())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

进入方法时,算出能够执行多久的deadline,然后在循环中判断时间。注意到代码中间有句:

nanosTimeout > spinForTimeoutThreshold

static final long spinForTimeoutThreshold = 1000L;

spinForTimeoutThreshold写死了1000ns,这就是所谓的自旋操作。当超时在1000ns内,让线程在循环中自旋,否则阻塞线程。

总结

两篇文章分别以ReentrantLock和CountDownLatch为例研究了AQS的独占功能和共享功能。

以上是 java分析CountDownLatch的实现原理 的全部内容, 来源链接: utcz.com/z/513799.html

回到顶部