Semaphore

编程

 

public class Semaphore implements java.io.Serializable {

private static final long serialVersionUID = -3222578661600680210L;

/** All mechanics via AbstractQueuedSynchronizer subclass */

private final Sync sync;

/**

* 继承AQS以实现信号量机制,使用AQS的state属性代表允许使用的信号量的数量,该Sync类有两个

* 派生类,其中一个是公平模式的,另一个是非公平模式的

*/

abstract static class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) {

setState(permits);

}

final int getPermits() {

return getState();

}

//非公平模式的获取信号量

final int nonfairTryAcquireShared(int acquires) {

for (;;) {

int available = getState();

int remaining = available - acquires;

if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

//释放信号量

protected final boolean tryReleaseShared(int releases) {

for (;;) {

int current = getState();

int next = current + releases;

if (next < current) // overflow

throw new Error("Maximum permit count exceeded");

if (compareAndSetState(current, next))

return true;

}

}

//减少信号量

final void reducePermits(int reductions) {

for (;;) {

int current = getState();

int next = current - reductions;

if (next > current) // underflow

throw new Error("Permit count underflow");

if (compareAndSetState(current, next))

return;

}

}

清空所有信号量

final int drainPermits() {

for (;;) {

int current = getState();

if (current == 0 || compareAndSetState(current, 0))

return current;

}

}

}

}

 

public class Semaphore implements java.io.Serializable {

/**

* 非公平模式

*/

static final class NonfairSync extends Sync {

private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {

super(permits);

}

protected int tryAcquireShared(int acquires) {

return nonfairTryAcquireShared(acquires);

}

}

/**

* 公平模式

*/

static final class FairSync extends Sync {

private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {

super(permits);

}

protected int tryAcquireShared(int acquires) {

for (;;) {

//公平模式下,先判断是否有前驱结点

if (hasQueuedPredecessors())

return -1;

int available = getState();

int remaining = available - acquires;

if (remaining < 0 ||

compareAndSetState(available, remaining))

return remaining;

}

}

}

}

 

public class Semaphore implements java.io.Serializable {

//未指定模式,默认生产非公平模式的信号量工具

public Semaphore(int permits) {

sync = new NonfairSync(permits);

}

//指定模式,生产公平或非公平模式的信号量工具

public Semaphore(int permits, boolean fair) {

sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

//使用可中断的方式获取信号量,tryAcquireShared->doAcquireSharedInterruptibly

public void acquire() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

//发起可中断的获取信号量,指定获取信号量的数量

public void acquire(int permits) throws InterruptedException {

if (permits < 0) throw new IllegalArgumentException();

sync.acquireSharedInterruptibly(permits);

}

//使用不可中断的方式获取信号量,tryAcquireShared->doAcquireShared

public void acquireUninterruptibly() {

sync.acquireShared(1);

}

//发起不可中的的获取信号量,指定获取信号量的数量

public void acquireUninterruptibly(int permits) {

if (permits < 0) throw new IllegalArgumentException();

sync.acquireShared(permits);

}

//使用非公平模式获取信号量,默认获取一个信号量

public boolean tryAcquire() {

return sync.nonfairTryAcquireShared(1) >= 0;

}

//使用非公平模式获取信号量,指定获取信号量的数量

public boolean tryAcquire(int permits) {

if (permits < 0) throw new IllegalArgumentException();

return sync.nonfairTryAcquireShared(permits) >= 0;

}

//使用超时的模式获取信号量,默认获取一个信号量

public boolean tryAcquire(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

//使用超时的模式获取信号量,指定获取信号量的数量

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)

throws InterruptedException {

if (permits < 0) throw new IllegalArgumentException();

return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));

}

//释放信号量

public void release() {

sync.releaseShared(1);

}

}

 

以上是 Semaphore 的全部内容, 来源链接: utcz.com/z/511383.html

回到顶部