【Java】Java高并发之CountDownLatch源码分析
Java高并发之CountDownLatch源码分析
入门小站发布于 今天 14:50
概述
使用方法
public static void main(String[] args) {Test countDownLatchTest=new Test();
countDownLatchTest.runThread();
}
//计数器为10,代表有10个线程等待创建
CountDownLatch countDownLatch=new CountDownLatch(10);
/**
* 创建一个线程
* @return
*/
private Thread createThread(int i){
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
try {
//在此等待,直到计数器变为0
countDownLatch.await();
System.out.println("thread"+Thread.currentThread().getName()+"准备完毕"+System.currentTimeMillis());
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
thread.setName("thread-"+i);
return thread;
}
public void runThread(){
ExecutorService executorService= Executors.newFixedThreadPool(10);
try {
for(int i=0;i<10;i++){
Thread.sleep(100);
executorService.submit(createThread(i));
//一个线程创建好了,待创建的线程数减一
countDownLatch.countDown();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
源码分析
从锁的分类上来讲,CountDownLatch 其实是一个” 共享锁 “。还有一个需要注意的是 CountDownLath 是响应中断的,如果线程在对锁进行操作的期间发生了中断,会直接抛出 InterruptedException。
源码分析
计数器的本质是什么?
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {setState(count); //就是修改了AQS中的state值
}
await 方法
- await 方法实际上是调用了 sync 的一个方法
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
- sync 的
void acquireSharedInterruptibly(int arg)
的实现如下
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
if (Thread.interrupted())
//如果线程中断了,则抛异常。
//证明了之前所说的CountDownLatch是会响应中断的
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- 如果没有中断,就会调用
tryAcquireShared(arg)
它的实现非常的简单,如果 state 为 0,就返回 1,否则返回 - 1
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
- 如果 state 不为 0,就会返回 - 1,if 条件成立,就会调用
doAcquireSharedInterruptibly(arg)
这个方法的实现,稍微复杂一点,但这个方法也不陌生了,它的功能就是把该线程加入等待队列中并阻塞,但是在入队之后,不一定会立即 park 阻塞,它会判断自己是否是第二个节点,如果是就会再次尝试获取。
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {//前一个节点是头节点
int r = tryAcquireShared(arg); //去看一看state是否为0,步骤3分析过
if (r >= 0) {
//如果state目前为0,就出队
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//进入阻塞队列阻塞,如果发生中断,则抛异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
countDown 方法
- countDown 方法实际上是调用 sync 中的一个方法
public void countDown() {sync.releaseShared(1);
}
boolean releaseShared(int arg)
的具体实现如下:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(arg)
方法的具体实现如下:
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zero
for (;;) {//自旋
int c = getState();
if (c == 0)//计数器已经都是0了,当然会释放失败咯
return false;
int nextc = c-1;//释放后,计数器减一
if (compareAndSetState(c, nextc))//CAS修改计数器
return nextc == 0;
}
}
doReleaseShared();
的实现如下,它的作用就是state为0的时候,去唤醒等待队列中的线程。
private void doReleaseShared() {/*
* 如果head需要通知下一个节点,调用unparkSuccessor
* 如果不需要通知,需要在释放后把waitStatus改为PROPAGATE来继续传播
* 此外,我们必须通过自旋来CAS以防止操作时有新节点加入
* 另外,不同于其他unparkSuccessor的用途,我们需要知道CAS设置状态失败的情况,
* 以便进行重新检查。
*/
for (;;) {
//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
//其实就是唤醒上面新获取到共享锁的节点的后继节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//执行唤醒操作
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head)
break;
}
}
关注微信公众号:【入门小站】,解锁更多知识点
java多线程
阅读 39更新于 今天 15:11
本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
入门小站
rumenz.com
47 声望
3 粉丝
入门小站
rumenz.com
47 声望
3 粉丝
宣传栏
目录
概述
使用方法
public static void main(String[] args) {Test countDownLatchTest=new Test();
countDownLatchTest.runThread();
}
//计数器为10,代表有10个线程等待创建
CountDownLatch countDownLatch=new CountDownLatch(10);
/**
* 创建一个线程
* @return
*/
private Thread createThread(int i){
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
try {
//在此等待,直到计数器变为0
countDownLatch.await();
System.out.println("thread"+Thread.currentThread().getName()+"准备完毕"+System.currentTimeMillis());
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
thread.setName("thread-"+i);
return thread;
}
public void runThread(){
ExecutorService executorService= Executors.newFixedThreadPool(10);
try {
for(int i=0;i<10;i++){
Thread.sleep(100);
executorService.submit(createThread(i));
//一个线程创建好了,待创建的线程数减一
countDownLatch.countDown();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
源码分析
从锁的分类上来讲,CountDownLatch 其实是一个” 共享锁 “。还有一个需要注意的是 CountDownLath 是响应中断的,如果线程在对锁进行操作的期间发生了中断,会直接抛出 InterruptedException。
源码分析
计数器的本质是什么?
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {setState(count); //就是修改了AQS中的state值
}
await 方法
- await 方法实际上是调用了 sync 的一个方法
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
- sync 的
void acquireSharedInterruptibly(int arg)
的实现如下
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
if (Thread.interrupted())
//如果线程中断了,则抛异常。
//证明了之前所说的CountDownLatch是会响应中断的
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- 如果没有中断,就会调用
tryAcquireShared(arg)
它的实现非常的简单,如果 state 为 0,就返回 1,否则返回 - 1
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
- 如果 state 不为 0,就会返回 - 1,if 条件成立,就会调用
doAcquireSharedInterruptibly(arg)
这个方法的实现,稍微复杂一点,但这个方法也不陌生了,它的功能就是把该线程加入等待队列中并阻塞,但是在入队之后,不一定会立即 park 阻塞,它会判断自己是否是第二个节点,如果是就会再次尝试获取。
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); //获取当前节点的前驱节点
if (p == head) {//前一个节点是头节点
int r = tryAcquireShared(arg); //去看一看state是否为0,步骤3分析过
if (r >= 0) {
//如果state目前为0,就出队
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//进入阻塞队列阻塞,如果发生中断,则抛异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
countDown 方法
- countDown 方法实际上是调用 sync 中的一个方法
public void countDown() {sync.releaseShared(1);
}
boolean releaseShared(int arg)
的具体实现如下:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(arg)
方法的具体实现如下:
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zero
for (;;) {//自旋
int c = getState();
if (c == 0)//计数器已经都是0了,当然会释放失败咯
return false;
int nextc = c-1;//释放后,计数器减一
if (compareAndSetState(c, nextc))//CAS修改计数器
return nextc == 0;
}
}
doReleaseShared();
的实现如下,它的作用就是state为0的时候,去唤醒等待队列中的线程。
private void doReleaseShared() {/*
* 如果head需要通知下一个节点,调用unparkSuccessor
* 如果不需要通知,需要在释放后把waitStatus改为PROPAGATE来继续传播
* 此外,我们必须通过自旋来CAS以防止操作时有新节点加入
* 另外,不同于其他unparkSuccessor的用途,我们需要知道CAS设置状态失败的情况,
* 以便进行重新检查。
*/
for (;;) {
//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
//其实就是唤醒上面新获取到共享锁的节点的后继节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//执行唤醒操作
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head)
break;
}
}
关注微信公众号:【入门小站】,解锁更多知识点
以上是 【Java】Java高并发之CountDownLatch源码分析 的全部内容, 来源链接: utcz.com/a/110887.html
得票时间