聊聊artemis的ClientProducerCredits

编程

本文主要研究一下artemis的ClientProducerCredits

ClientProducerCredits

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java

public interface ClientProducerCredits {

void acquireCredits(int credits) throws ActiveMQException;

void receiveCredits(int credits);

void receiveFailCredits(int credits);

boolean isBlocked();

void init(SessionContext sessionContext);

void reset();

void close();

void incrementRefCount();

int decrementRefCount();

void releaseOutstanding();

SimpleString getAddress();

}

  • ClientProducerCredits接口定义了acquireCredits、receiveCredits、receiveFailCredits、isBlocked、init、reset、close、incrementRefCount、decrementRefCount、releaseOutstanding、getAddress方法

AbstractProducerCreditsImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java

public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits {

protected int pendingCredits;

private final int windowSize;

protected volatile boolean closed;

protected boolean blocked;

protected final SimpleString address;

private final ClientSessionInternal session;

protected int arriving;

private int refCount;

protected boolean serverRespondedWithFail;

protected SessionContext sessionContext;

public AbstractProducerCreditsImpl(final ClientSessionInternal session,

final SimpleString address,

final int windowSize) {

this.session = session;

this.address = address;

this.windowSize = windowSize / 2;

}

@Override

public SimpleString getAddress() {

return address;

}

@Override

public void init(SessionContext sessionContext) {

// We initial request twice as many credits as we request in subsequent requests

// This allows the producer to keep sending as more arrive, minimising pauses

checkCredits(windowSize);

this.sessionContext = sessionContext;

this.sessionContext.linkFlowControl(address, this);

}

@Override

public void acquireCredits(final int credits) throws ActiveMQException {

checkCredits(credits);

actualAcquire(credits);

afterAcquired(credits);

}

protected void afterAcquired(int credits) throws ActiveMQAddressFullException {

// check to see if the blocking mode is FAIL on the server

synchronized (this) {

pendingCredits -= credits;

}

}

protected abstract void actualAcquire(int credits);

@Override

public boolean isBlocked() {

return blocked;

}

@Override

public void receiveFailCredits(final int credits) {

serverRespondedWithFail = true;

// receive credits like normal to keep the sender from blocking

receiveCredits(credits);

}

@Override

public void receiveCredits(final int credits) {

synchronized (this) {

arriving -= credits;

}

}

@Override

public synchronized void reset() {

// Any pendingCredits credits from before failover won"t arrive, so we re-initialise

int beforeFailure = pendingCredits;

pendingCredits = 0;

arriving = 0;

// If we are waiting for more credits than what"s configured, then we need to use what we tried before

// otherwise the client may starve as the credit will never arrive

checkCredits(Math.max(windowSize * 2, beforeFailure));

}

@Override

public void close() {

// Closing a producer that is blocking should make it return

closed = true;

}

@Override

public synchronized void incrementRefCount() {

refCount++;

}

@Override

public synchronized int decrementRefCount() {

return --refCount;

}

public abstract int getBalance();

protected void checkCredits(final int credits) {

int needed = Math.max(credits, windowSize);

int toRequest = -1;

synchronized (this) {

if (getBalance() + arriving < needed) {

toRequest = needed - arriving;

pendingCredits += toRequest;

arriving += toRequest;

}

}

if (toRequest != -1) {

requestCredits(toRequest);

}

}

private void requestCredits(final int credits) {

session.sendProducerCreditsMessage(credits, address);

}

}

  • AbstractProducerCreditsImpl实现了ClientProducerCredits的部分方法;其构造器接收session、address、windowSize参数;其init方法首先执行checkCredits,然后执行sessionContext.linkFlowControl(address, this)
  • acquireCredits方法先执行checkCredits,在执行actualAcquire及afterAcquired方法;afterAcquired方法会从pendingCredits扣减credits;receiveFailCredits方法设置serverRespondedWithFail为true,然后执行receiveCredits方法;receiveCredits方法会从arriving扣减credits;reset方法会重置pendingCredits、arriving,然后执行checkCredits方法
  • checkCredits方法先计算needed,再计算toRequest同时更新pendingCredits及arriving;在toRequest不为-1时会执行requestCredits方法;requestCredits方法执行的是session.sendProducerCreditsMessage(credits, address)方法

ClientProducerCreditsImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java

public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl {

private final Semaphore semaphore;

public ClientProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize) {

super(session, address, windowSize);

// Doesn"t need to be fair since session is single threaded

semaphore = new Semaphore(0, false);

}

@Override

protected void afterAcquired(int credits) throws ActiveMQAddressFullException {

// check to see if the blocking mode is FAIL on the server

synchronized (this) {

super.afterAcquired(credits);

if (serverRespondedWithFail) {

serverRespondedWithFail = false;

// remove existing credits to force the client to ask the server for more on the next send

semaphore.drainPermits();

pendingCredits = 0;

arriving = 0;

throw ActiveMQClientMessageBundle.BUNDLE.addressIsFull(address.toString(), credits);

}

}

}

@Override

protected void actualAcquire(int credits) {

boolean tryAcquire;

synchronized (this) {

tryAcquire = semaphore.tryAcquire(credits);

}

if (!tryAcquire && !closed) {

this.blocked = true;

try {

while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {

// I"m using string concatenation here in case address is null

// better getting a "null" string than a NPE

ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);

}

} catch (InterruptedException interrupted) {

Thread.currentThread().interrupt();

throw new ActiveMQInterruptedException(interrupted);

} finally {

this.blocked = false;

}

}

}

@Override

public synchronized void reset() {

// Any pendingCredits credits from before failover won"t arrive, so we re-initialise

semaphore.drainPermits();

super.reset();

}

@Override

public void close() {

super.close();

// Closing a producer that is blocking should make it return

semaphore.release(Integer.MAX_VALUE / 2);

}

@Override

public void receiveCredits(final int credits) {

synchronized (this) {

super.receiveCredits(credits);

}

semaphore.release(credits);

}

@Override

public synchronized void releaseOutstanding() {

semaphore.drainPermits();

}

@Override

public int getBalance() {

return semaphore.availablePermits();

}

}

  • ClientProducerCreditsImpl继承了AbstractProducerCreditsImpl,其构造器创建了permits为0,fair为false的Semaphore;afterAcquired方法在serverRespondedWithFail为true时会执行semaphore.drainPermits()然后抛出ActiveMQClientMessageBundle.BUNDLE.addressIsFull(address.toString(), credits)
  • actualAcquire方法会先执行semaphore.tryAcquire(credits),若为false则设置blocked为true,然后一直while循环semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)直到为true,最后再finally设置blocked为false
  • receiveCredits方法主要是执行semaphore.release(credits);getBalance方法返回的是semaphore.availablePermits();releaseOutstanding执行的是semaphore.release(credits);reset方法执行的是semaphore.drainPermits();close方法执行的是 semaphore.release(Integer.MAX_VALUE / 2)

小结

  • AbstractProducerCreditsImpl实现了ClientProducerCredits的部分方法;其构造器接收session、address、windowSize参数;其init方法首先执行checkCredits,然后执行sessionContext.linkFlowControl(address, this)
  • acquireCredits方法先执行checkCredits,在执行actualAcquire及afterAcquired方法;afterAcquired方法会从pendingCredits扣减credits;receiveFailCredits方法设置serverRespondedWithFail为true,然后执行receiveCredits方法;receiveCredits方法会从arriving扣减credits;reset方法会重置pendingCredits、arriving,然后执行checkCredits方法
  • checkCredits方法先计算needed,再计算toRequest同时更新pendingCredits及arriving;在toRequest不为-1时会执行requestCredits方法;requestCredits方法执行的是session.sendProducerCreditsMessage(credits, address)方法

doc

  • ClientProducerCredits

以上是 聊聊artemis的ClientProducerCredits 的全部内容, 来源链接: utcz.com/z/512721.html

回到顶部