聊聊artemis的ClientProducerCreditManager

序
本文主要研究一下artemis的ClientProducerCreditManager
ClientProducerCreditManager
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
public interface ClientProducerCreditManager {   ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context);
   void returnCredits(SimpleString address);
   void receiveCredits(SimpleString address, int credits);
   void receiveFailCredits(SimpleString address, int credits);
   void reset();
   void close();
   int creditsMapSize();
   int unReferencedCreditsSize();
   /** This will determine the flow control as asynchronous,
    *  no actual block should happen instead a callback will be sent whenever blockages change  */
   void setCallback(ClientProducerFlowCallback callback);
}
- ClientProducerCreditManager接口定义了getCredits、returnCredits、receiveCredits、receiveFailCredits、reset、close、creditsMapSize、unReferencedCreditsSize、setCallback方法
 
ClientProducerCreditManagerImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager {   public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000;
   private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<>();
   private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new LinkedHashMap<>();
   private final ClientSessionInternal session;
   private int windowSize;
   private ClientProducerFlowCallback callback;
   public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {
      this.session = session;
      this.windowSize = windowSize;
   }
   /** This will determine the flow control as asynchronous,
    *  no actual block should happen instead a callback will be sent whenever blockages change  */
   @Override
   public void setCallback(ClientProducerFlowCallback callback) {
      this.callback = callback;
   }
   @Override
   public synchronized ClientProducerCredits getCredits(final SimpleString address,
                                                        final boolean anon,
                                                        SessionContext context) {
      if (windowSize == -1) {
         return ClientProducerCreditsNoFlowControl.instance;
      } else {
         boolean needInit = false;
         ClientProducerCredits credits;
         synchronized (this) {
            credits = producerCredits.get(address);
            if (credits == null) {
               // Doesn"t need to be fair since session is single threaded
               credits = build(address);
               needInit = true;
               producerCredits.put(address, credits);
            }
            if (!anon) {
               credits.incrementRefCount();
               // Remove from anon credits (if there)
               unReferencedCredits.remove(address);
            } else {
               addToUnReferencedCache(address, credits);
            }
         }
         // The init is done outside of the lock
         // otherwise packages may arrive with flow control
         // while this is still sending requests causing a dead lock
         if (needInit) {
            credits.init(context);
         }
         return credits;
      }
   }
   private ClientProducerCredits build(SimpleString address) {
      if (callback != null) {
         return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback);
      } else {
         return new ClientProducerCreditsImpl(session, address, windowSize);
      }
   }
   @Override
   public synchronized void returnCredits(final SimpleString address) {
      ClientProducerCredits credits = producerCredits.get(address);
      if (credits != null && credits.decrementRefCount() == 0) {
         addToUnReferencedCache(address, credits);
      }
   }
   @Override
   public synchronized void receiveCredits(final SimpleString address, final int credits) {
      ClientProducerCredits cr = producerCredits.get(address);
      if (cr != null) {
         cr.receiveCredits(credits);
      }
   }
   @Override
   public synchronized void receiveFailCredits(final SimpleString address, int credits) {
      ClientProducerCredits cr = producerCredits.get(address);
      if (cr != null) {
         cr.receiveFailCredits(credits);
      }
   }
   @Override
   public synchronized void reset() {
      for (ClientProducerCredits credits : producerCredits.values()) {
         credits.reset();
      }
   }
   @Override
   public synchronized void close() {
      windowSize = -1;
      for (ClientProducerCredits credits : producerCredits.values()) {
         credits.close();
      }
      producerCredits.clear();
      unReferencedCredits.clear();
   }
   @Override
   public synchronized int creditsMapSize() {
      return producerCredits.size();
   }
   @Override
   public synchronized int unReferencedCreditsSize() {
      return unReferencedCredits.size();
   }
   private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits) {
      unReferencedCredits.put(address, credits);
      if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
         // Remove the oldest entry
         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator();
         Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
         iter.remove();
         removeEntry(oldest.getKey(), oldest.getValue());
      }
   }
   private void removeEntry(final SimpleString address, final ClientProducerCredits credits) {
      producerCredits.remove(address);
      credits.releaseOutstanding();
      credits.close();
   }
   //......
}
- ClientProducerCreditManagerImpl实现了ClientProducerCreditManager接口,它的构造器接收session及windowSize参数;其getCredits方法在windowSize为-1时返回ClientProducerCreditsNoFlowControl.instance,否则根据address从producerCredits获取或创建ClientProducerCredits,若是新建的则needInit为true会执行credits.init(context)
 - returnCredits方法从producerCredits获取ClientProducerCredits,然后递减refCount,若为0则执行addToUnReferencedCache;receiveCredits方法从producerCredits获取ClientProducerCredits,然后执行cr.receiveCredits(credits);receiveFailCredits方法从producerCredits获取ClientProducerCredits,然后执行cr.receiveFailCredits(credits)
 - reset方法遍历producerCredits.values()挨个执行ClientProducerCredits的reset方法;close方法遍历producerCredits.values()挨个执行ClientProducerCredits的close方法,然后清空producerCredits、unReferencedCredits
 
ClientProducerCreditsNoFlowControl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
   static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits {      static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();
      @Override
      public void acquireCredits(int credits) {
      }
      @Override
      public void receiveCredits(int credits) {
      }
      @Override
      public void receiveFailCredits(int credits) {
      }
      @Override
      public boolean isBlocked() {
         return false;
      }
      @Override
      public void init(SessionContext ctx) {
      }
      @Override
      public void reset() {
      }
      @Override
      public void close() {
      }
      @Override
      public void incrementRefCount() {
      }
      @Override
      public int decrementRefCount() {
         return 1;
      }
      @Override
      public void releaseOutstanding() {
      }
      @Override
      public SimpleString getAddress() {
         return SimpleString.toSimpleString("");
      }
   }
- ClientProducerCreditsNoFlowControl实现了ClientProducerCredits接口,其isBlocked方法返回false,decrementRefCount方法返回1,getAddress方法返回SimpleString.toSimpleString(""),其余方法都是空操作
 
小结
ClientProducerCreditManager接口定义了getCredits、returnCredits、receiveCredits、receiveFailCredits、reset、close、creditsMapSize、unReferencedCreditsSize、setCallback方法
doc
- ClientProducerCreditManager
 
以上是 聊聊artemis的ClientProducerCreditManager 的全部内容, 来源链接: utcz.com/z/512691.html


