多线程进阶——JUC并发编程之AQS源码一探究竟

编程

既然AQS是一个提供给我们使用的框架,那么我们可以看看这个框架都有哪些类依赖它。

从框架来看,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch...等。

现在看来我们还有点懵逼,这个框架具体是怎么设计的?下面我们翻看源码注释一探究竟!其中AQS里面维护了一个Node节点构造的CLH队列(FIFO)先进先出队列

  /**

* <p>To enqueue into a CLH lock, you atomically splice it in as new

* tail. To dequeue, you just set the head field.

* <pre>

* +------+ prev +-----+ +-----+

* head | | <---- | | <---- | | tail

* +------+ +-----+ +-----+

* </pre>

*

* <p>Insertion into a CLH queue requires only a single atomic

* operation on "tail", so there is a simple atomic point of

* demarcation from unqueued to queued. Similarly, dequeuing

* involves only updating the "head". However, it takes a bit

* more work for nodes to determine who their successors are,

* in part to deal with possible cancellation due to timeouts

* and interrupts.

*

*/

CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread),状态(waitStatus)、前驱节点(prev)、后继节点(next),其数据结构如下:其实就是一个双端链表

2、Node 节点如何定义

static final class Node {

/** 共享 */

static final Node SHARED = new Node();

/** 独占 */

static final Node EXCLUSIVE = null;

/** 因为超时或者中断,节点会被设置为取消状态,被取消的节点是不会参与竞争的,它会一直保持取消状态不会转变为其它状态 */

static final int CANCELLED = 1;

/** 前节点的线程如果释放了同步状态或者被取消会通知后继处于等待状态的节点,使得后继节点得以运行 */

static final int SIGNAL = -1;

/** 节点在Condition等待队列中,当其他线程对Condition调用Signal()后,该节点将会从等待队列转移到同步队列,加入同步状态的获取 */

static final int CONDITION = -2;

/** 表示下一次共享模式将会无条件传递下去 */

static final int PROPAGATE = -3;

/** 等待状态 */

volatile int waitStatus;

/** 前驱节点*/

volatile Node prev;

/** 后继节点 */

volatile Node next;

/** 获取同步状态的线程 */

volatile Thread thread;

/** 用来连接Condition等待队列中的节点 */

Node nextWaiter;

....

}

在我前几次博客中分析了CountDownLatch、CyclicBarrier、Semaphore ,基本对Node有了清晰的了解,Node节点是对每个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞,是否被唤醒,是否已经被取消等。

可以看到AQS支持两种同步模式  【EXCLUSIVE 独占只有一个线程能执行,如ReentrantLock和【SHARED 共享,多个线程可以同时执行,如 Semphore/CountDownLatch,这样方便使用者实现不同类型的同步组件,简而言之,AQS为使用者提供了多样的底层支持,具体如何组装实现,使用者可以自由发挥。

3、核心方法源码分析

3.1、独占模式的获取与释放状态

public final void acquire(int arg) {

/**

* 1 tryAcquire 尝试获取同步状态;

* 2 addWaiter 如果尝试获取到同步状态失败,则加入到同步队列中;

* 3 acquireQueued 在队列中尝试获取同步状态.

* 4 selfInterrupt 如果线程在等待过程中被中断过,它是不响应的,只是获取资源后才再将自我中断补上

*/

if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

此方法是独占模式下线程获取共享资源的顶层入口,如果【tryAcquire(arg)】获取到资源直接返回,否则进入【acquireQueued】等待队列,直到获取到资源为止.

1、进入【tryAcquire 】方法

protected boolean tryAcquire(int arg) {

throw new UnsupportedOperationException();

}

该方法直接抛出异常,具体实现交给自定义同步器去实现,注意AQS里面没有定义这个方法为抽象方法,是因为独占模式只需要实现【 tryAcquire-tryRelease 】,而共享模式只需要实现【 tryAcquireShared-tryReleaseShared,如果都定义成abstract,那么每个模式都要实现独占和共享模式的接口,得不偿失。

因为AQS只是个框架,基于模板模式,仅提供基于状态和同步队列的实现,具体的实现逻辑交给不同子类去实现,我们现在看看公平策略的 ReentrantLock实现

        protected final boolean tryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

//尝试修改状态值

if (!hasQueuedPredecessors() &&

compareAndSetState(0, acquires)) {

//修改状态值成功,记录当前持有同步状态的线程信息

setExclusiveOwnerThread(current);

return true;

}

}

//如果当前线程已经持有同步状态,继续修改同步状态

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

假如【tryAcquire】获取失败的线程,程序返回false,进入【acquireQueued】加入到AQS同步队列的队尾,判断【p == head && tryAcquire(arg)】如果当前节点的前驱节点为头结点,则再次尝试获取同步状态

2、【acquireQueued (addWaiter())】源码分析

//没有获取到同步状态的线程添加到AQS等待队列队尾

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// 快速入队操作

Node pred = tail;

if (pred != null) {

node.prev = pred;

//将当前线程设置到队尾

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

//快速入队失败兜底入队的方法

enq(node);

return node;

}

//AQS同步队列中的节点,尝试获取同步状态

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;//标记是否成功拿到资源

try {

boolean interrupted = false;//标记等待过程中是否被中断过

for (;;) {//自旋

final Node p = node.predecessor();//拿到前驱

//如果当前节点的前驱节点为头结点,则再次尝试获取同步状态

if (p == head && tryAcquire(arg)) {

//当前节点设置为头结点,相当于头结点出队

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

//如果自己可以休息了,就通过park进入waiting状态,直到被unpark,如果不可中断的情况下被中断了,那么会从park中醒来,发现拿不到资源,继续进入park等待

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())//阻塞挂起当前线程直到被唤醒

interrupted = true;//如果等待过程被中断,就将状态标记为true

}

} finally {

if (failed)//如果等待过程中没有成功获取到资源(如timeout,或者可中断的情况下被中断)那么取消节点在队列中的等待

cancelAcquire(node);

}

}

如果头结点的下一个节点尝试获取同步状态失败后,会进入等待状态,其它节点则继续自旋

3、独占模式获取同步状态总结:

【acquireQueued】函数流程:类似医院排队拿号

1、节点进入队尾后,检查状态,找到安全休息点

2、调用park()进入waiting状态,等待unpark()或者interrupt()唤醒自己

3、被唤醒后,看自己是不是有资格拿到号,如果拿到,head指向当前节点,并返回从入队到拿到号的整个过程中是否被中断过,如果没拿到继续流程1

回到【acquire()】函数流程

    public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

1、调用自定义同步器的【tryAcquire】尝试直接去获取资源,如果成功则直接返回

2、没成功,则【addWaiter】将线程加入等待队列尾部,并标记为独占模式

3、【acquireQueued】使线程在等待队列中休息,有机会时(轮到自己,被unpark)会尝试获取资源,直到获取资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false

4、如果线程在等待过程中被中断过,它是不响应的,只是获取资源后,才再进行自我中断【selfInterrupt】,将中断补上

4、释放同步状态【release】方法

当线程执行完相应逻辑后,需要释放同步状态,使后继节点有机会同步状态(让出资源,让排队的线程使用)。这时候就需要调用【release】方法,唤醒后继节点。

1、释放同步状态,唤醒后继节点

//释放同步状态

public final boolean release(int arg) {

//尝试释放同步状态

if (tryRelease(arg)) {

Node h = head;

//释放成功后,执行unpark,即唤醒操作(暂时可忽略waitStatus涉及到条件队列)

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

//尝试释放同步状态,同步状态-需要释放资源的值

//如果state=0,表示当前线程 获取次数=释放次数,既然释放成功,此时持有同步状态线程标志位null

protected final boolean tryRelease(int releases) {

int c = getState() - releases;

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false;

//状态码=0,表示释放成功了

if (c == 0) {

free = true;

//独占标志设置为null

setExclusiveOwnerThread(null);

}

setState(c);

return free;

}

如果释放同步状态成功,则进入【unparkSuccessor】方法唤醒后继节点

//唤醒后继节点

private void unparkSuccessor(Node node) {

//这里node一般为当前线程所在的节点

int ws = node.waitStatus;

if (ws < 0)

//置为0当前线程所在的节点状态,允许失败

compareAndSetWaitStatus(node, ws, 0);

//找到下一个需要被唤醒的节点

Node s = node.next;

//如果为空,或者已经取消了

if (s == null || s.waitStatus > 0) {

s = null;

//从后往前找

for (Node t = tail; t != null && t != node; t = t.prev)

//从这里可以看出,<=0 的节点还是有效节点

if (t.waitStatus <= 0)

s = t;

}

//唤醒后继节点

if (s != null)

LockSupport.unpark(s.thread);

}

1、后继节点获取同步状态成功,头结点出队。需要注意的是,出队操作是间接的,有节点获取到同步状态时,会将当前节点设置为head,而原本的head设置为null。

//在AQS等待队列阻塞的节点尝试获取同步状态

//获取成功后,将当前节点设置为头结点,头结点设置为null,即头结点出队

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {//自旋

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

//a.操作,当前节点设置为头结点,当前节点的前驱节点设置为null

setHead(node);

//b.原始的head的next设置为null,此时原始的head已经被移出队列

p.next = null; // help GC

failed = false;

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

//补充a操作代码,设置当前节点的前驱节点为null

private void setHead(Node node) {

head = node;

node.thread = null;

node.prev = null;

}

5、小结

【unparkSuccessor】中unpark唤醒等待队列这个号最前面的那个未放弃线程,这里使用s来表示,此时,在和【acquireQueued】联系起来,s被唤醒后,进入【 if (p == head && tryAcquire(arg))】判断(即p !=head也没关系,他会再次进入【shouldParkAfterFailedAcquire】寻找一个安全点,设置head的next节点为自己,那么下一次自旋 p== head 也就成立了),然后s吧自己设置成head标杆节点,表示自己已经获取到资源了,acquire也返回了。

6、其它竞争的情况

1、当同步队列的头结点唤醒后继节点时,此时可能有其它线程尝试获取同步状态

2、假设线程5获取同步状态成功,将会被设置为头结点,上面代码已经说明了

3、头结点后续节点获取同步状态失败

3.2、共享模式获取与释放状态

共享模式与独占模式最主要的区别在于 支持同一时刻有多少个线程同时获取同步状态,为了避免额外的负担,在上文提到的同步队列中都是用独占模式来进行讲述,其实同步队列中的节点应该是独占和共享节点并存的。

接下来针对共享模式状态下获取与释放过程进行分析

1、获取同步状态

当一个共享节点获取到同步状态,并唤醒后面等待的共享状态的结果如下图所示:

2、【acquireShared】

/** 

* 共享模式获取同步状态

* 1、首先至少要调用一次tryAcquireShared方法,获取同步状态

* 2、加入到同步队列中,循环阻塞与唤醒,直到获取同步状态为止,获取成功会唤醒后面还在等待的共享节点

* 并把该唤醒事件传递下去,也即依次唤醒在该节点后面的所有共享节点

*/

public final void acquireShared(int arg) {

if (tryAcquireShared(arg) < 0)

doAcquireShared(arg);

}

/** 自旋模式获取同步状态 */

private void doAcquireShared(int arg) {

//第一次获取同步状态失败后,会将此线程加入到同步队列中

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

//如果前驱节点是头结点,尝试获取同步状态

final Node p = node.predecessor();

if (p == head) {

// r>0 表示获取同步状态成功,并且还有共享类型节点在同步队列中

// r == 0 表示获取同步状态成功,但是同步队列中没有其它共享模式节点

int r = tryAcquireShared(arg);

if (r >= 0) {

//获取同步状态成功后,将当前node设置为头结点,并向后传播,唤醒共享模式等待的节点

setHeadAndPropagate(node, r);

p.next = null; // help GC

if (interrupted)

selfInterrupt();

failed = false;

return;

}

}

//判断状态,寻找安全点,进入waiting,等被unpark或者interrupt

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

/** 设置新的头结点,并设置后面需要唤醒的节点 */

private void setHeadAndPropagate(Node node, int propagate) {

Node h = head; // Record old head for check below

setHead(node);

//propagate>0表示后面需要唤醒的共享模式节点

if (propagate > 0 || h == null || h.waitStatus < 0 ||

(h = head) == null || h.waitStatus < 0) {

Node s = node.next;

//如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒

//这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒

if (s == null || s.isShared())

doReleaseShared();

}

/** 唤醒所有共享模式节点 */

private void doReleaseShared() {

for (;;) {

//唤醒操作由头结点开始,注意这里的头结点已经是上面新设置好的头结点

//其实就是唤醒上面新获取到的共享模式节点的后继节点

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

//表示后继节点需要被唤醒

if (ws == Node.SIGNAL) {

//这里需要控制并发,因为入口有setHeadAndPropagate和release两个,避免两次UNpark

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);//执行唤醒操作(唤醒后续共享模式节点)

}

//如果头结点没有发生变化,表示设置完成,退出循环

//如果头结点发生变化,比如其它线程获取了锁,为了使自己的唤醒动作可以被传递,必须进行重试

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

3、小结【acquireShared】流程

1、【tryAcquireShared】尝试获取资源,成功则直接返回

2、失败则通过【doAcquireShared】进入等待队列park(),直到被unpark()或者interrupt()并成功获取到资源才返回,整个等待过程也是忽略中断的。

其实和【acquire】流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(共享嘛)

最后,获取到同步状态的线程执行完毕,同步队列中只有一个独占节点:

释放同步状态

    public final boolean releaseShared(int arg) {

//尝试释放同步状态

if (tryReleaseShared(arg)) {

//释放成功后,唤醒后续等待共享节点

doReleaseShared();

return true;

}

return false;

}

4、基于AQS 实现互斥锁

接下来我们就基于AQS自定义一个互斥锁来完成相同的功能

代码实现

/**

* 自定义互斥锁

*/

public class MutexLock {

private static final Sync STATE_HOLDER = new Sync();

/**

* 通过 Sync 内部类来持有同步状态,当状态为1表示锁被持有,0表示锁处于空闲状态

*/

private static class Sync extends AbstractQueuedSynchronizer {

/**

* 是否被独占,有两种表示方式

* 1、可以根据状态,state=1 表示锁被占用,0 表示空闲

* 2、可以根据当前独占锁的线程来判断,即 getExclusiveOwnerThread()!=null 表示被独占

*/

@Override

protected boolean isHeldExclusively() {

return getState() == 1;

}

/**

* 尝试获取锁,从状态0修改为1,表示锁被当前线程独占了

* @param acquires

* @return

*/

@Override

protected boolean tryAcquire(int acquires) {

assert acquires ==1;//这里限定只能为1个

if (compareAndSetState(0,1)){//state为0才设置为1,不可重入

setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源

return true;

}

return false;

}

/**

* 尝试释放资源

* @param releases

* @return

*/

@Override

protected boolean tryRelease(int releases) {

assert releases ==1;

//既然要释放资源,那肯定有资源啦

if (getState() == 0){

throw new UnsupportedOperationException();

}

setExclusiveOwnerThread(null);

//释放资源,放弃占有状态

setState(0);

return true;

}

}

/**

* 下面的实现Lock接口需要重写的方法,基本就是调用内部类Sync的方法

*/

public void lock(){

STATE_HOLDER.acquire(1);

}

public void unlock(){

STATE_HOLDER.release(1);

}

}

1、锁的测试

package com.lock.ext;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* 自定义锁 测试

*/

public class MyLockTest {

public static void main(String[] args) throws InterruptedException {

int threadNum = 10;

int countPerThread = 10000;

//线程池创建的正确姿势

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(threadNum,

threadNum, 1000, TimeUnit.SECONDS,

new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());

CountDownLatch countDownLatch = new CountDownLatch(threadNum);

Counter counter = new Counter();

Counter counterUnsafe = new Counter();

for (int i = 0; i < threadNum; i++) {

poolExecutor.submit(()->{

for (int j = 0; j < countPerThread; j++) {

counter.getAndIncrement();

counterUnsafe.getAndIncrementUnSfae();

}

countDownLatch.countDown();

});

}

countDownLatch.await();

System.out.printf("%s 个线程,每个线程累加了 %s 次,执行结果:safeCounter = %s, unsafeCounter = %s ", threadNum, countPerThread, counter.get(), counterUnsafe.get());

poolExecutor.shutdownNow();

}

}

package com.lock.ext;

class Counter {

private MutexLock mutexLock;

private volatile int count;

Counter() {

this.mutexLock = new MutexLock();

}

int get() {

return count;

}

int getAndIncrement() {

mutexLock.lock();

count++;

mutexLock.unlock();

return count;

}

int getAndIncrementUnSfae() {

count++;

return count;

}

}

结果和预期一样,用自定义锁实现的计数器统计没有误差

5、总结

1、AQS 通过一个 int 同步状态码,和一个 (先进先出)队列来控制多个线程访问资源

2、支持独占和共享两种模式获取同步状态码

3、当线程获取同步状态失败会被加入到同步队列中

4、当线程释放同步锁,会唤醒后继节点来获取同步状态

5、共享模式下的节点获取到同步状态或者释放同步状态时,不仅会唤醒后继节点,还会向后传播,唤醒所有同步节点

6、使用volatile关键字保证状态码在线程间可见性,CAS操作保证修改状态码过程的原子性

推荐阅读:

多线程进阶——JUC并发编程之AQS源码一探究竟

多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟

多线程进阶——JUC并发编程之Semaphore源码一探究竟

以上是 多线程进阶——JUC并发编程之AQS源码一探究竟 的全部内容, 来源链接: utcz.com/z/513956.html

回到顶部