聊聊artemis的ClientProducerCreditManager

编程

本文主要研究一下artemis的ClientProducerCreditManager

ClientProducerCreditManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java

public interface ClientProducerCreditManager {

ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context);

void returnCredits(SimpleString address);

void receiveCredits(SimpleString address, int credits);

void receiveFailCredits(SimpleString address, int credits);

void reset();

void close();

int creditsMapSize();

int unReferencedCreditsSize();

/** This will determine the flow control as asynchronous,

* no actual block should happen instead a callback will be sent whenever blockages change */

void setCallback(ClientProducerFlowCallback callback);

}

  • ClientProducerCreditManager接口定义了getCredits、returnCredits、receiveCredits、receiveFailCredits、reset、close、creditsMapSize、unReferencedCreditsSize、setCallback方法

ClientProducerCreditManagerImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java

public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager {

public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000;

private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<>();

private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new LinkedHashMap<>();

private final ClientSessionInternal session;

private int windowSize;

private ClientProducerFlowCallback callback;

public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {

this.session = session;

this.windowSize = windowSize;

}

/** This will determine the flow control as asynchronous,

* no actual block should happen instead a callback will be sent whenever blockages change */

@Override

public void setCallback(ClientProducerFlowCallback callback) {

this.callback = callback;

}

@Override

public synchronized ClientProducerCredits getCredits(final SimpleString address,

final boolean anon,

SessionContext context) {

if (windowSize == -1) {

return ClientProducerCreditsNoFlowControl.instance;

} else {

boolean needInit = false;

ClientProducerCredits credits;

synchronized (this) {

credits = producerCredits.get(address);

if (credits == null) {

// Doesn"t need to be fair since session is single threaded

credits = build(address);

needInit = true;

producerCredits.put(address, credits);

}

if (!anon) {

credits.incrementRefCount();

// Remove from anon credits (if there)

unReferencedCredits.remove(address);

} else {

addToUnReferencedCache(address, credits);

}

}

// The init is done outside of the lock

// otherwise packages may arrive with flow control

// while this is still sending requests causing a dead lock

if (needInit) {

credits.init(context);

}

return credits;

}

}

private ClientProducerCredits build(SimpleString address) {

if (callback != null) {

return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback);

} else {

return new ClientProducerCreditsImpl(session, address, windowSize);

}

}

@Override

public synchronized void returnCredits(final SimpleString address) {

ClientProducerCredits credits = producerCredits.get(address);

if (credits != null && credits.decrementRefCount() == 0) {

addToUnReferencedCache(address, credits);

}

}

@Override

public synchronized void receiveCredits(final SimpleString address, final int credits) {

ClientProducerCredits cr = producerCredits.get(address);

if (cr != null) {

cr.receiveCredits(credits);

}

}

@Override

public synchronized void receiveFailCredits(final SimpleString address, int credits) {

ClientProducerCredits cr = producerCredits.get(address);

if (cr != null) {

cr.receiveFailCredits(credits);

}

}

@Override

public synchronized void reset() {

for (ClientProducerCredits credits : producerCredits.values()) {

credits.reset();

}

}

@Override

public synchronized void close() {

windowSize = -1;

for (ClientProducerCredits credits : producerCredits.values()) {

credits.close();

}

producerCredits.clear();

unReferencedCredits.clear();

}

@Override

public synchronized int creditsMapSize() {

return producerCredits.size();

}

@Override

public synchronized int unReferencedCreditsSize() {

return unReferencedCredits.size();

}

private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits) {

unReferencedCredits.put(address, credits);

if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {

// Remove the oldest entry

Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator();

Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();

iter.remove();

removeEntry(oldest.getKey(), oldest.getValue());

}

}

private void removeEntry(final SimpleString address, final ClientProducerCredits credits) {

producerCredits.remove(address);

credits.releaseOutstanding();

credits.close();

}

//......

}

  • ClientProducerCreditManagerImpl实现了ClientProducerCreditManager接口,它的构造器接收session及windowSize参数;其getCredits方法在windowSize为-1时返回ClientProducerCreditsNoFlowControl.instance,否则根据address从producerCredits获取或创建ClientProducerCredits,若是新建的则needInit为true会执行credits.init(context)
  • returnCredits方法从producerCredits获取ClientProducerCredits,然后递减refCount,若为0则执行addToUnReferencedCache;receiveCredits方法从producerCredits获取ClientProducerCredits,然后执行cr.receiveCredits(credits);receiveFailCredits方法从producerCredits获取ClientProducerCredits,然后执行cr.receiveFailCredits(credits)
  • reset方法遍历producerCredits.values()挨个执行ClientProducerCredits的reset方法;close方法遍历producerCredits.values()挨个执行ClientProducerCredits的close方法,然后清空producerCredits、unReferencedCredits

ClientProducerCreditsNoFlowControl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java

   static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits {

static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();

@Override

public void acquireCredits(int credits) {

}

@Override

public void receiveCredits(int credits) {

}

@Override

public void receiveFailCredits(int credits) {

}

@Override

public boolean isBlocked() {

return false;

}

@Override

public void init(SessionContext ctx) {

}

@Override

public void reset() {

}

@Override

public void close() {

}

@Override

public void incrementRefCount() {

}

@Override

public int decrementRefCount() {

return 1;

}

@Override

public void releaseOutstanding() {

}

@Override

public SimpleString getAddress() {

return SimpleString.toSimpleString("");

}

}

  • ClientProducerCreditsNoFlowControl实现了ClientProducerCredits接口,其isBlocked方法返回false,decrementRefCount方法返回1,getAddress方法返回SimpleString.toSimpleString(""),其余方法都是空操作

小结

ClientProducerCreditManager接口定义了getCredits、returnCredits、receiveCredits、receiveFailCredits、reset、close、creditsMapSize、unReferencedCreditsSize、setCallback方法

doc

  • ClientProducerCreditManager

以上是 聊聊artemis的ClientProducerCreditManager 的全部内容, 来源链接: utcz.com/z/512691.html

回到顶部