聊聊canal的BooleanMutex

编程

BooleanMutex

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java

public class BooleanMutex {

private Sync sync;

public BooleanMutex(){

sync = new Sync();

set(false);

}

public BooleanMutex(Boolean mutex){

sync = new Sync();

set(mutex);

}

/**

* 阻塞等待Boolean为true

*

* @throws InterruptedException

*/

public void get() throws InterruptedException {

sync.innerGet();

}

/**

* 阻塞等待Boolean为true,允许设置超时时间

*

* @param timeout

* @param unit

* @throws InterruptedException

* @throws TimeoutException

*/

public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {

sync.innerGet(unit.toNanos(timeout));

}

/**

* 重新设置对应的Boolean mutex

*

* @param mutex

*/

public void set(Boolean mutex) {

if (mutex) {

sync.innerSetTrue();

} else {

sync.innerSetFalse();

}

}

public boolean state() {

return sync.innerState();

}

//......

}

  • BooleanMutex定义了sync属性,其get方法执行的是sync.innerGet(),其set方法执行的是sync.innerSetTrue()或者sync.innerSetFalse(),其state方法返回的是sync.innerState()

Sync

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java

    private final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 2559471934544126329L;

/** State value representing that TRUE */

private static final int TRUE = 1;

/** State value representing that FALSE */

private static final int FALSE = 2;

private boolean isTrue(int state) {

return (state & TRUE) != 0;

}

/**

* 实现AQS的接口,获取共享锁的判断

*/

protected int tryAcquireShared(int state) {

// 如果为true,直接允许获取锁对象

// 如果为false,进入阻塞队列,等待被唤醒

return isTrue(getState()) ? 1 : -1;

}

/**

* 实现AQS的接口,释放共享锁的判断

*/

protected boolean tryReleaseShared(int ignore) {

// 始终返回true,代表可以release

return true;

}

boolean innerState() {

return isTrue(getState());

}

void innerGet() throws InterruptedException {

acquireSharedInterruptibly(0);

}

void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException {

if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();

}

void innerSetTrue() {

for (;;) {

int s = getState();

if (s == TRUE) {

return; // 直接退出

}

if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作

releaseShared(0);// 释放一下锁对象,唤醒一下阻塞的Thread

return;

}

}

}

void innerSetFalse() {

for (;;) {

int s = getState();

if (s == FALSE) {

return; // 直接退出

}

if (compareAndSetState(s, FALSE)) {// cas更新状态,避免并发更新false操作

return;

}

}

}

}

  • Sync继承了AbstractQueuedSynchronizer,其tryReleaseShared方法始终返回true,其innerGet方法执行的是acquireSharedInterruptibly方法,其innerSetTrue方法执行compareAndSetState(s, TRUE),其innerSetFalse执行compareAndSetState(s, FALSE)

小结

BooleanMutex定义了sync属性,其get方法执行的是sync.innerGet(),其set方法执行的是sync.innerSetTrue()或者sync.innerSetFalse(),其state方法返回的是sync.innerState()

doc

  • BooleanMutex

以上是 聊聊canal的BooleanMutex 的全部内容, 来源链接: utcz.com/z/515173.html

回到顶部