【Java】Java并发编程之CAS和AQS

Java并发编程之CAS和AQS

入门小站发布于 1 月 28 日

什么是CAS

public final boolean compareAndSet(int expect, int update) {

return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

}

CAS的应用

public class AtomicInteger extends Number implements java.io.Serializable {

private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates

private static final Unsafe unsafe = Unsafe.getUnsafe();

private volatile int value;// 初始int大小

// 省略了部分代码...

// 带参数构造函数,可设置初始int大小

public AtomicInteger(int initialValue) {

value = initialValue;

}

// 不带参数构造函数,初始int大小为0

public AtomicInteger() {

}

// 获取当前值

public final int get() {

return value;

}

// 设置值为 newValue

public final void set(int newValue) {

value = newValue;

}

//返回旧值,并设置新值为 newValue

public final int getAndSet(int newValue) {

/**

* 这里使用for循环不断通过CAS操作来设置新值

* CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系

* */

for (;;) {

int current = get();

if (compareAndSet(current, newValue))

return current;

}

}

// 原子的设置新值为update, expect为期望的当前的值

public final boolean compareAndSet(int expect, int update) {

return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

}

// 获取当前值current,并设置新值为current+1

public final int getAndIncrement() {

for (;;) {

int current = get();

int next = current + 1;

if (compareAndSet(current, next))

return current;

}

}

// 此处省略部分代码,余下的代码大致实现原理都是类似的

}

//有线程安全问题

public class Counter {

private int count;

public Counter(){}

public int getCount(){

return count;

}

public void increase(){

count++;

}

}

//悲观锁,线程安全,缺点性能差

public class Counter {

private int count;

public Counter(){}

public synchronized int getCount(){

return count;

}

public synchronized void increase(){

count++;

}

}

//乐观锁,线程安全,性能好

public class Counter {

private AtomicInteger count = new AtomicInteger();

public Counter(){}

public int getCount(){

return count.get();

}

public void increase(){

count.getAndIncrement();

}

}

CAS的三大缺点

ABA问题

循环时间长开销大

只能保证一个共享变量的原子操作

什么是AQS

AQS的应用

boolean tryAcquire(int arg)

boolean tryRelease(int arg)

int tryAcquireShared(int arg)

boolean tryReleaseShared(int arg)

boolean isHeldExclusively()

public class CountDownLatch {

/**

* 基于AQS的内部Sync

* 使用AQS的state来表示计数count.

*/

private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {

// 使用AQS的getState()方法设置状态

setState(count);

}

int getCount() {

// 使用AQS的getState()方法获取状态

return getState();

}

// 覆盖在共享模式下尝试获取锁

protected int tryAcquireShared(int acquires) {

// 这里用状态state是否为0来表示是否成功,为0的时候可以获取到返回1,否则不可以返回-1

return (getState() == 0) ? 1 : -1;

}

// 覆盖在共享模式下尝试释放锁

protected boolean tryReleaseShared(int releases) {

// 在for循环中Decrement count直至成功;

// 当状态值即count为0的时候,返回false表示 signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

}

private final Sync sync;

// 使用给定计数值构造CountDownLatch

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

// 让当前线程阻塞直到计数count变为0,或者线程被中断

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

// 阻塞当前线程,除非count变为0或者等待了timeout的时间。当count变为0时,返回true

public boolean await(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

// count递减

public void countDown() {

sync.releaseShared(1);

}

// 获取当前count值

public long getCount() {

return sync.getCount();

}

public String toString() {

return super.toString() + "[Count = " + sync.getCount() + "]";

}

}

AQS 原理

 *      +------+  prev +-----+       +-----+

* head | | <---- | | <---- | | tail

* +------+ +-----+ +-----+

【Java】Java并发编程之CAS和AQS

简单源码分析

    public final void acquire(int arg) {

// 首先尝试获取,不成功的话则将其加入到等待队列,再for循环获取

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

// 从clh中选一个线程获取占用资源

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

// 当节点的先驱是head的时候,就可以尝试获取占用资源了tryAcquire

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

// 如果获取到资源,则将当前节点设置为头节点head

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

// 如果获取失败的话,判断是否可以休息,可以的话就进入waiting状态,直到被unpark()

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

private Node addWaiter(Node mode) {

// 封装当前线程和模式为新的节点,并将其加入到队列中

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) {

// tail为null,说明还没初始化,此时需进行初始化工作

if (compareAndSetHead(new Node()))

tail = head;

} else {

// 否则的话,将当前线程节点作为tail节点加入到CLH中去

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

关注微信公众号:【入门小站】,解锁更多知识点。

【Java】Java并发编程之CAS和AQS

java多线程

阅读 50发布于 1 月 28 日

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

入门小站

rumenz.com

47 声望

3 粉丝

0 条评论

得票时间

avatar

入门小站

rumenz.com

47 声望

3 粉丝

宣传栏

什么是CAS

public final boolean compareAndSet(int expect, int update) {

return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

}

CAS的应用

public class AtomicInteger extends Number implements java.io.Serializable {

private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates

private static final Unsafe unsafe = Unsafe.getUnsafe();

private volatile int value;// 初始int大小

// 省略了部分代码...

// 带参数构造函数,可设置初始int大小

public AtomicInteger(int initialValue) {

value = initialValue;

}

// 不带参数构造函数,初始int大小为0

public AtomicInteger() {

}

// 获取当前值

public final int get() {

return value;

}

// 设置值为 newValue

public final void set(int newValue) {

value = newValue;

}

//返回旧值,并设置新值为 newValue

public final int getAndSet(int newValue) {

/**

* 这里使用for循环不断通过CAS操作来设置新值

* CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系

* */

for (;;) {

int current = get();

if (compareAndSet(current, newValue))

return current;

}

}

// 原子的设置新值为update, expect为期望的当前的值

public final boolean compareAndSet(int expect, int update) {

return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

}

// 获取当前值current,并设置新值为current+1

public final int getAndIncrement() {

for (;;) {

int current = get();

int next = current + 1;

if (compareAndSet(current, next))

return current;

}

}

// 此处省略部分代码,余下的代码大致实现原理都是类似的

}

//有线程安全问题

public class Counter {

private int count;

public Counter(){}

public int getCount(){

return count;

}

public void increase(){

count++;

}

}

//悲观锁,线程安全,缺点性能差

public class Counter {

private int count;

public Counter(){}

public synchronized int getCount(){

return count;

}

public synchronized void increase(){

count++;

}

}

//乐观锁,线程安全,性能好

public class Counter {

private AtomicInteger count = new AtomicInteger();

public Counter(){}

public int getCount(){

return count.get();

}

public void increase(){

count.getAndIncrement();

}

}

CAS的三大缺点

ABA问题

循环时间长开销大

只能保证一个共享变量的原子操作

什么是AQS

AQS的应用

boolean tryAcquire(int arg)

boolean tryRelease(int arg)

int tryAcquireShared(int arg)

boolean tryReleaseShared(int arg)

boolean isHeldExclusively()

public class CountDownLatch {

/**

* 基于AQS的内部Sync

* 使用AQS的state来表示计数count.

*/

private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {

// 使用AQS的getState()方法设置状态

setState(count);

}

int getCount() {

// 使用AQS的getState()方法获取状态

return getState();

}

// 覆盖在共享模式下尝试获取锁

protected int tryAcquireShared(int acquires) {

// 这里用状态state是否为0来表示是否成功,为0的时候可以获取到返回1,否则不可以返回-1

return (getState() == 0) ? 1 : -1;

}

// 覆盖在共享模式下尝试释放锁

protected boolean tryReleaseShared(int releases) {

// 在for循环中Decrement count直至成功;

// 当状态值即count为0的时候,返回false表示 signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

}

private final Sync sync;

// 使用给定计数值构造CountDownLatch

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

// 让当前线程阻塞直到计数count变为0,或者线程被中断

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

// 阻塞当前线程,除非count变为0或者等待了timeout的时间。当count变为0时,返回true

public boolean await(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

// count递减

public void countDown() {

sync.releaseShared(1);

}

// 获取当前count值

public long getCount() {

return sync.getCount();

}

public String toString() {

return super.toString() + "[Count = " + sync.getCount() + "]";

}

}

AQS 原理

 *      +------+  prev +-----+       +-----+

* head | | <---- | | <---- | | tail

* +------+ +-----+ +-----+

【Java】Java并发编程之CAS和AQS

简单源码分析

    public final void acquire(int arg) {

// 首先尝试获取,不成功的话则将其加入到等待队列,再for循环获取

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

// 从clh中选一个线程获取占用资源

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

// 当节点的先驱是head的时候,就可以尝试获取占用资源了tryAcquire

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

// 如果获取到资源,则将当前节点设置为头节点head

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

// 如果获取失败的话,判断是否可以休息,可以的话就进入waiting状态,直到被unpark()

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

private Node addWaiter(Node mode) {

// 封装当前线程和模式为新的节点,并将其加入到队列中

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) {

// tail为null,说明还没初始化,此时需进行初始化工作

if (compareAndSetHead(new Node()))

tail = head;

} else {

// 否则的话,将当前线程节点作为tail节点加入到CLH中去

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

关注微信公众号:【入门小站】,解锁更多知识点。

【Java】Java并发编程之CAS和AQS

以上是 【Java】Java并发编程之CAS和AQS 的全部内容, 来源链接: utcz.com/a/109911.html

回到顶部