【Java】JDK源码分析-AbstractQueuedSynchronizer(2)

JDK源码分析-AbstractQueuedSynchronizer(2)

WriteOnRead发布于 10 分钟前

1. 概述

前文「JDK源码分析-AbstractQueuedSynchronizer(1)」初步分析了 AQS,其中提到了 Node 节点的「独占模式」和「共享模式」,其实 AQS 也主要是围绕对这两种模式的操作进行的。

Node 节点是对线程 Thread 类的封装,因此两种模式可以理解如下:

  • 独占模式(exclusive):线程对资源的访问是排他的,即某个时间只能一个线程单独访问资源;
  • 共享模式(shared):与独占模式不同,多个线程可以同时访问资源。

本文先分析独占模式下的各种操作,后面再分析共享模式。

2. 独占模式

2.1 方法概述

独占模式下的操作主要有以下几个方法(可与前面分析的 Lock 接口的方法类比):

  1. acquire(int arg)

    以独占模式获取资源,忽略中断;可以类比 Lock 接口的 lock 方法;

  2. acquireInterruptibly(int arg)

    以独占模式获取资源,响应中断;可以类比 Lock 接口的 lockInterruptibly 方法;

  3. tryAcquireNanos(int arg, long nanosTimeout)

    以独占模式获取资源,响应中断,且有超时等待;可以类比 Lock 接口的 tryLock(long, TimeUnit) 方法;

  4. release(int arg)

    释放资源,可以类比 Lock 接口的 unlock 方法。

2.2 方法分析

2.2.1 独占模式获取资源(忽略中断)

这几种获取资源的方法很多地方是类似的。我们先从 acquire 方法开始分析,如下:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

该方法看似很短,其实是内部做了封装。这几行代码包含了如下四个操作步骤:

  1. tryAcquire
  2. addWaiter(Node.EXECUSIVE)
  3. acquireQueued(final Node node, arg))
  4. selfInterrupt

上面的四个步骤不一定全部执行,下面依次进行分析。

  • step 1: tryAcquire

protected boolean tryAcquire(int arg) {

throw new UnsupportedOperationException();

}

该方法的作用是尝试以独占模式获取资源,若成功则返回 true。

可以看到该方法是一个 protected 方法,而且 AQS 中该方法直接抛出了异常,其实是它把实现委托给了子类。这也是 ReentrantLock、CountdownLatch 等类(严格来说是其内部类 Sync)的实现功能不同的地方,这些类正是通过对该方法的不同实现来制定了自己的“游戏规则”。

若 step 1 中的 tryAcquire 方法返回 true,则表示当前线程获取资源成功,方法直接返回,该线程接下来就可以“为所欲为”了;否则表示获取失败,接下来会依次执行 step 2 和 step 3。

  • step 2: addWaiter(Node.EXECUSIVE)

private Node addWaiter(Node mode) {

// 将当前线程封装为一个 Node 节点,指定 mode

// PS: 独占模式 Node.EXECUSIVE, 共享模式 Node.SHARED

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

// 通过 CAS 操作设置主队列的尾节点

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

// 尾节点 tail 为 null,表示主队列未初始化

enq(node);

return node;

}

enq 方法:

private Node enq(final Node node) {

for (;;) {

Node t = tail;

// 尾节点为空,表明当前队列未初始化

if (t == null) { // Must initialize

// 将队列的头尾节点都设置为一个新的节点

if (compareAndSetHead(new Node()))

tail = head;

} else {

// 将 node 节点插入主队列末尾

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

可以看到 addWaiter(Node.EXECUSIVE) 方法的作用是:把当前线程封装成一个独占模式的 Node 节点,并插入到主队列末尾(若主队列未初始化,则将其初始化后再插入)。

  • step 3: acquireQueued(final Node node, arg))

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

// 中断标志位

boolean interrupted = false;

for (;;) {

// 获取该节点的前驱节点

final Node p = node.predecessor();

// 若前驱节点为头节点,则尝试获取资源

if (p == head && tryAcquire(arg)) {

// 若获取成功,则将该节点设置为头节点并返回

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

// 若上面条件不满足,即前驱节点不是头节点,或尝试获取失败

// 判断当前线程是否可以休眠

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

若当前节点的前驱节点为头节点,则会再次尝试获取资源(tryAcuqire),若获取成功,则将当前节点设置为头节点并返回;否则,若前驱节点不是头节点,或者获取资源失败,则执行如下两个方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

// 前驱节点的等待状态

int ws = pred.waitStatus;

// 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠

if (ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park.

*/

return true;

// 若前驱节点的状态大于 0,表示前驱节点处于取消(CANCELLED)状态

// 则将前驱节点跳过(相当于踢出队列)

if (ws > 0) {

/*

* Predecessor was cancelled. Skip over predecessors and

* indicate retry.

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

/*

* waitStatus must be 0 or PROPAGATE. Indicate that we

* need a signal, but don't park yet. Caller will need to

* retry to make sure it cannot acquire before parking.

*/

// 此时 waitStatus 只能为 0 或 PROPAGATE 状态,将前驱节点的等着状态设置为 SIGNAL

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

该方法的流程:

  1. 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠(park);
  2. 若前驱节点是取消状态 (ws > 0),则将其清理出队列,以此类推;
  3. 若前驱节点为 0 或 PROPAGATE,则将其设置为 SIGNAL 状态。

正如其名,该方法(shouldParkAfterFailedAcquire)的作用就是判断当前线程在获取资源失败后,是否可以休眠(park)。

parkAndCheckInterrupt:

private final boolean parkAndCheckInterrupt() {

// 将当前线程休眠

LockSupport.park(this);

return Thread.interrupted();

}

该方法的作用:

  1. 使当前线程休眠(park);
  2. 返回该线程是否被中断(其他线程对其发过中断信号)。

上面就是 acquireQueued(final Node node, arg)) 方法的执行过程,为了便于理解,可参考下面的流程图:

【Java】JDK源码分析-AbstractQueuedSynchronizer(2)

若此期间被其他线程中断过,则此时再去执行 selfInterrupt 方法去响应中断请求:

static void selfInterrupt() {

Thread.currentThread().interrupt();

}

以上就是 acquire 方法执行的整体流程。

2.2.2 以独占模式获取资源(响应中断)

该操作其实与前面的过程类似,因此分析相对简单些,代码如下:

public final void acquireInterruptibly(int arg)

throws InterruptedException

// 若线程被中断过,则抛出异常

if (Thread.interrupted())

throw new InterruptedException();

// 尝试获取资源

if (!tryAcquire(arg))

// 尝试获取资源失败

doAcquireInterruptibly(arg);

}

tryAcquire 与前面的操作一样,若尝试获取资源成功则直接返回;否则,执行 doAcquireInterruptibly:

private void doAcquireInterruptibly(int arg)

throws InterruptedException

// 将当前线程封装成 Node 节点插入主队列末尾

final Node node = addWaiter(Node.EXCLUSIVE);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

// 抛出中断异常

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

通过与前面的 acquire 方法对比可以发现,二者代码几乎一样,区别在于 acquire 方法检测到中断(parkAndCheckInterrupt)时只是记录了标志位,并未响应;而此处直接抛出了异常。这也是二者仅有的区别,此处不再详细分析。

2.2.3 以独占模式获取资源(响应中断,且有超时)

该操作与前者也是类似的,代码如下:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)

throws InterruptedException

// 若被中断,则响应

if (Thread.interrupted())

throw new InterruptedException();

return tryAcquire(arg) ||

doAcquireNanos(arg, nanosTimeout);

}

doAcquireNanos:

static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireNanos(int arg, long nanosTimeout)

throws InterruptedException {

// 若超时时间小于等于 0,直接获取失败

if (nanosTimeout <= 0L)

return false;

// 计算截止时间

final long deadline = System.nanoTime() + nanosTimeout;

final Node node = addWaiter(Node.EXCLUSIVE);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return true;

}

nanosTimeout = deadline - System.nanoTime();

// 已经超时了,获取失败

if (nanosTimeout <= 0L)

return false;

// 若大于自旋时间,则线程休眠;否则自旋

if (shouldParkAfterFailedAcquire(p, node) &&

nanosTimeout > spinForTimeoutThreshold)

LockSupport.parkNanos(this, nanosTimeout);

// 若被中断,则响应

if (Thread.interrupted())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

这里有个变量 spinForTimeoutThreshold,表示自旋时间,若大于该值则将线程休眠,否则继续自旋。个人理解这里增加该时间是为了提高效率,即,只有在等待时间较长的时候才让线程休眠。

该方法与 acquireInterruptibly 也是类似的,在前者的基础上增加了 timeout,不再详细分析。

2.2.4 释放资源

前面分析了三种获取资源的方式,自然也有释放资源。下面分析释放资源的 release 操作:

public final boolean release(int arg) {

// 尝试释放资源,若成功则返回 true

if (tryRelease(arg)) {

Node h = head;

// 若头节点不为空,且等待状态不为 0(此时为 SIGNAL)

// 则唤醒其后继节点

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

与 tryAcquire 方法类似,tryRelease 方法在 AQS 中也是抛出异常,同样交由子类实现:

protected boolean tryRelease(int arg) {

throw new UnsupportedOperationException();

}

unparkSuccessor 的主要作用是唤醒 node 的后继节点,代码如下:

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node. But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

// 后继节点

Node s = node.next;

if (s == null || s.waitStatus > 0) {

// 若后继节点是取消状态,则从尾节点向前遍历,找到 node 节点后面一个未取消状态的节点

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

// 唤醒node节点的后继节点

if (s != null)

LockSupport.unpark(s.thread);

}

若 node 节点的后继节点是取消状态(ws > 0),则从主队列中取其后面一个非取消状态的线程唤醒。

前面三个获取资源的方法中,finally 代码块中都用到了 cancelAcquire 方法,都是获取失败时的操作,这里也分析一下:

private void cancelAcquire(Node node) {

// Ignore if node doesn't exist

if (node == null)

return;

node.thread = null;

// Skip cancelled predecessors

// 跳过取消状态的前驱节点

Node pred = node.prev;

while (pred.waitStatus > 0)

node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will

// fail if not, in which case, we lost race vs another cancel

// or signal, so no further action is necessary.

// 前驱节点的后继节点引用

Node predNext = pred.next;

// Can use unconditional write instead of CAS here.

// After this atomic step, other Nodes can skip past us.

// Before, we are free of interference from other threads.

// 将当前节点设置为取消状态

node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.

// 若该节点为尾节点(后面没其他节点了),将 predNext 指向 null

if (node == tail && compareAndSetTail(node, pred)) {

compareAndSetNext(pred, predNext, null);

} else {

// If successor needs signal, try to set pred's next-link

// so it will get one. Otherwise wake it up to propagate.

int ws;

if (pred != head &&

((ws = pred.waitStatus) == Node.SIGNAL ||

(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

pred.thread != null) {

Node next = node.next;

if (next != null && next.waitStatus <= 0)

compareAndSetNext(pred, predNext, next);

} else {

// 前驱节点为头节点,表明当前节点为第一个,取消时唤醒它的下一个节点

unparkSuccessor(node);

}

node.next = node; // help GC

}

}

该方法的主要操作:

  1. 将 node 节点设置为取消(CANCELLED)状态;
  2. 找到它在队列中非取消状态的前驱节点 pred:

    1. 若 node 节点是尾节点,则前驱节点的后继设为空,
    2. 若 pred 不是头节点,且状态为 SIGNAL,则后继节点设为 node 的后继节点;
    3. 若 pred 是头节点,则唤醒 node 的后继节点。

3. 小结

本文分析了以独占模式获取资源的三种方式,以及释放资源的操作。分别为:

  1. acquire: 独占模式获取资源,忽略中断;
  2. acquireInterruptibly: 独占模式获取资源,响应中断;
  3. tryAcquireNanos: 独占模式获取资源,响应中断,有超时;
  4. release: 释放资源,唤醒主队列中的下一个线程。

这几个方法都可以类比 Lock 接口的相关方法定义。

相关阅读:

JDK源码分析-AbstractQueuedSynchronizer(1)

JDK源码分析-Lock&Condition

【Java】JDK源码分析-AbstractQueuedSynchronizer(2)

java后端

阅读 13发布于 10 分钟前

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

WriteOnRead

微信公众号:WriteOnRead。

1 声望

0 粉丝

0 条评论

得票时间

avatar

WriteOnRead

微信公众号:WriteOnRead。

1 声望

0 粉丝

宣传栏

1. 概述

前文「JDK源码分析-AbstractQueuedSynchronizer(1)」初步分析了 AQS,其中提到了 Node 节点的「独占模式」和「共享模式」,其实 AQS 也主要是围绕对这两种模式的操作进行的。

Node 节点是对线程 Thread 类的封装,因此两种模式可以理解如下:

  • 独占模式(exclusive):线程对资源的访问是排他的,即某个时间只能一个线程单独访问资源;
  • 共享模式(shared):与独占模式不同,多个线程可以同时访问资源。

本文先分析独占模式下的各种操作,后面再分析共享模式。

2. 独占模式

2.1 方法概述

独占模式下的操作主要有以下几个方法(可与前面分析的 Lock 接口的方法类比):

  1. acquire(int arg)

    以独占模式获取资源,忽略中断;可以类比 Lock 接口的 lock 方法;

  2. acquireInterruptibly(int arg)

    以独占模式获取资源,响应中断;可以类比 Lock 接口的 lockInterruptibly 方法;

  3. tryAcquireNanos(int arg, long nanosTimeout)

    以独占模式获取资源,响应中断,且有超时等待;可以类比 Lock 接口的 tryLock(long, TimeUnit) 方法;

  4. release(int arg)

    释放资源,可以类比 Lock 接口的 unlock 方法。

2.2 方法分析

2.2.1 独占模式获取资源(忽略中断)

这几种获取资源的方法很多地方是类似的。我们先从 acquire 方法开始分析,如下:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

该方法看似很短,其实是内部做了封装。这几行代码包含了如下四个操作步骤:

  1. tryAcquire
  2. addWaiter(Node.EXECUSIVE)
  3. acquireQueued(final Node node, arg))
  4. selfInterrupt

上面的四个步骤不一定全部执行,下面依次进行分析。

  • step 1: tryAcquire

protected boolean tryAcquire(int arg) {

throw new UnsupportedOperationException();

}

该方法的作用是尝试以独占模式获取资源,若成功则返回 true。

可以看到该方法是一个 protected 方法,而且 AQS 中该方法直接抛出了异常,其实是它把实现委托给了子类。这也是 ReentrantLock、CountdownLatch 等类(严格来说是其内部类 Sync)的实现功能不同的地方,这些类正是通过对该方法的不同实现来制定了自己的“游戏规则”。

若 step 1 中的 tryAcquire 方法返回 true,则表示当前线程获取资源成功,方法直接返回,该线程接下来就可以“为所欲为”了;否则表示获取失败,接下来会依次执行 step 2 和 step 3。

  • step 2: addWaiter(Node.EXECUSIVE)

private Node addWaiter(Node mode) {

// 将当前线程封装为一个 Node 节点,指定 mode

// PS: 独占模式 Node.EXECUSIVE, 共享模式 Node.SHARED

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

// 通过 CAS 操作设置主队列的尾节点

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

// 尾节点 tail 为 null,表示主队列未初始化

enq(node);

return node;

}

enq 方法:

private Node enq(final Node node) {

for (;;) {

Node t = tail;

// 尾节点为空,表明当前队列未初始化

if (t == null) { // Must initialize

// 将队列的头尾节点都设置为一个新的节点

if (compareAndSetHead(new Node()))

tail = head;

} else {

// 将 node 节点插入主队列末尾

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

可以看到 addWaiter(Node.EXECUSIVE) 方法的作用是:把当前线程封装成一个独占模式的 Node 节点,并插入到主队列末尾(若主队列未初始化,则将其初始化后再插入)。

  • step 3: acquireQueued(final Node node, arg))

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

// 中断标志位

boolean interrupted = false;

for (;;) {

// 获取该节点的前驱节点

final Node p = node.predecessor();

// 若前驱节点为头节点,则尝试获取资源

if (p == head && tryAcquire(arg)) {

// 若获取成功,则将该节点设置为头节点并返回

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

// 若上面条件不满足,即前驱节点不是头节点,或尝试获取失败

// 判断当前线程是否可以休眠

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

若当前节点的前驱节点为头节点,则会再次尝试获取资源(tryAcuqire),若获取成功,则将当前节点设置为头节点并返回;否则,若前驱节点不是头节点,或者获取资源失败,则执行如下两个方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

// 前驱节点的等待状态

int ws = pred.waitStatus;

// 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠

if (ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park.

*/

return true;

// 若前驱节点的状态大于 0,表示前驱节点处于取消(CANCELLED)状态

// 则将前驱节点跳过(相当于踢出队列)

if (ws > 0) {

/*

* Predecessor was cancelled. Skip over predecessors and

* indicate retry.

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

/*

* waitStatus must be 0 or PROPAGATE. Indicate that we

* need a signal, but don't park yet. Caller will need to

* retry to make sure it cannot acquire before parking.

*/

// 此时 waitStatus 只能为 0 或 PROPAGATE 状态,将前驱节点的等着状态设置为 SIGNAL

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

该方法的流程:

  1. 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠(park);
  2. 若前驱节点是取消状态 (ws > 0),则将其清理出队列,以此类推;
  3. 若前驱节点为 0 或 PROPAGATE,则将其设置为 SIGNAL 状态。

正如其名,该方法(shouldParkAfterFailedAcquire)的作用就是判断当前线程在获取资源失败后,是否可以休眠(park)。

parkAndCheckInterrupt:

private final boolean parkAndCheckInterrupt() {

// 将当前线程休眠

LockSupport.park(this);

return Thread.interrupted();

}

该方法的作用:

  1. 使当前线程休眠(park);
  2. 返回该线程是否被中断(其他线程对其发过中断信号)。

上面就是 acquireQueued(final Node node, arg)) 方法的执行过程,为了便于理解,可参考下面的流程图:

【Java】JDK源码分析-AbstractQueuedSynchronizer(2)

若此期间被其他线程中断过,则此时再去执行 selfInterrupt 方法去响应中断请求:

static void selfInterrupt() {

Thread.currentThread().interrupt();

}

以上就是 acquire 方法执行的整体流程。

2.2.2 以独占模式获取资源(响应中断)

该操作其实与前面的过程类似,因此分析相对简单些,代码如下:

public final void acquireInterruptibly(int arg)

throws InterruptedException

// 若线程被中断过,则抛出异常

if (Thread.interrupted())

throw new InterruptedException();

// 尝试获取资源

if (!tryAcquire(arg))

// 尝试获取资源失败

doAcquireInterruptibly(arg);

}

tryAcquire 与前面的操作一样,若尝试获取资源成功则直接返回;否则,执行 doAcquireInterruptibly:

private void doAcquireInterruptibly(int arg)

throws InterruptedException

// 将当前线程封装成 Node 节点插入主队列末尾

final Node node = addWaiter(Node.EXCLUSIVE);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

// 抛出中断异常

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

通过与前面的 acquire 方法对比可以发现,二者代码几乎一样,区别在于 acquire 方法检测到中断(parkAndCheckInterrupt)时只是记录了标志位,并未响应;而此处直接抛出了异常。这也是二者仅有的区别,此处不再详细分析。

2.2.3 以独占模式获取资源(响应中断,且有超时)

该操作与前者也是类似的,代码如下:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)

throws InterruptedException

// 若被中断,则响应

if (Thread.interrupted())

throw new InterruptedException();

return tryAcquire(arg) ||

doAcquireNanos(arg, nanosTimeout);

}

doAcquireNanos:

static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireNanos(int arg, long nanosTimeout)

throws InterruptedException {

// 若超时时间小于等于 0,直接获取失败

if (nanosTimeout <= 0L)

return false;

// 计算截止时间

final long deadline = System.nanoTime() + nanosTimeout;

final Node node = addWaiter(Node.EXCLUSIVE);

boolean failed = true;

try {

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

failed = false;

return true;

}

nanosTimeout = deadline - System.nanoTime();

// 已经超时了,获取失败

if (nanosTimeout <= 0L)

return false;

// 若大于自旋时间,则线程休眠;否则自旋

if (shouldParkAfterFailedAcquire(p, node) &&

nanosTimeout > spinForTimeoutThreshold)

LockSupport.parkNanos(this, nanosTimeout);

// 若被中断,则响应

if (Thread.interrupted())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

这里有个变量 spinForTimeoutThreshold,表示自旋时间,若大于该值则将线程休眠,否则继续自旋。个人理解这里增加该时间是为了提高效率,即,只有在等待时间较长的时候才让线程休眠。

该方法与 acquireInterruptibly 也是类似的,在前者的基础上增加了 timeout,不再详细分析。

2.2.4 释放资源

前面分析了三种获取资源的方式,自然也有释放资源。下面分析释放资源的 release 操作:

public final boolean release(int arg) {

// 尝试释放资源,若成功则返回 true

if (tryRelease(arg)) {

Node h = head;

// 若头节点不为空,且等待状态不为 0(此时为 SIGNAL)

// 则唤醒其后继节点

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

与 tryAcquire 方法类似,tryRelease 方法在 AQS 中也是抛出异常,同样交由子类实现:

protected boolean tryRelease(int arg) {

throw new UnsupportedOperationException();

}

unparkSuccessor 的主要作用是唤醒 node 的后继节点,代码如下:

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node. But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

// 后继节点

Node s = node.next;

if (s == null || s.waitStatus > 0) {

// 若后继节点是取消状态,则从尾节点向前遍历,找到 node 节点后面一个未取消状态的节点

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

// 唤醒node节点的后继节点

if (s != null)

LockSupport.unpark(s.thread);

}

若 node 节点的后继节点是取消状态(ws > 0),则从主队列中取其后面一个非取消状态的线程唤醒。

前面三个获取资源的方法中,finally 代码块中都用到了 cancelAcquire 方法,都是获取失败时的操作,这里也分析一下:

private void cancelAcquire(Node node) {

// Ignore if node doesn't exist

if (node == null)

return;

node.thread = null;

// Skip cancelled predecessors

// 跳过取消状态的前驱节点

Node pred = node.prev;

while (pred.waitStatus > 0)

node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will

// fail if not, in which case, we lost race vs another cancel

// or signal, so no further action is necessary.

// 前驱节点的后继节点引用

Node predNext = pred.next;

// Can use unconditional write instead of CAS here.

// After this atomic step, other Nodes can skip past us.

// Before, we are free of interference from other threads.

// 将当前节点设置为取消状态

node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.

// 若该节点为尾节点(后面没其他节点了),将 predNext 指向 null

if (node == tail && compareAndSetTail(node, pred)) {

compareAndSetNext(pred, predNext, null);

} else {

// If successor needs signal, try to set pred's next-link

// so it will get one. Otherwise wake it up to propagate.

int ws;

if (pred != head &&

((ws = pred.waitStatus) == Node.SIGNAL ||

(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

pred.thread != null) {

Node next = node.next;

if (next != null && next.waitStatus <= 0)

compareAndSetNext(pred, predNext, next);

} else {

// 前驱节点为头节点,表明当前节点为第一个,取消时唤醒它的下一个节点

unparkSuccessor(node);

}

node.next = node; // help GC

}

}

该方法的主要操作:

  1. 将 node 节点设置为取消(CANCELLED)状态;
  2. 找到它在队列中非取消状态的前驱节点 pred:

    1. 若 node 节点是尾节点,则前驱节点的后继设为空,
    2. 若 pred 不是头节点,且状态为 SIGNAL,则后继节点设为 node 的后继节点;
    3. 若 pred 是头节点,则唤醒 node 的后继节点。

3. 小结

本文分析了以独占模式获取资源的三种方式,以及释放资源的操作。分别为:

  1. acquire: 独占模式获取资源,忽略中断;
  2. acquireInterruptibly: 独占模式获取资源,响应中断;
  3. tryAcquireNanos: 独占模式获取资源,响应中断,有超时;
  4. release: 释放资源,唤醒主队列中的下一个线程。

这几个方法都可以类比 Lock 接口的相关方法定义。

相关阅读:

JDK源码分析-AbstractQueuedSynchronizer(1)

JDK源码分析-Lock&Condition

【Java】JDK源码分析-AbstractQueuedSynchronizer(2)

以上是 【Java】JDK源码分析-AbstractQueuedSynchronizer(2) 的全部内容, 来源链接: utcz.com/a/114842.html

回到顶部