聊聊artemis的SessionConsumerFlowCreditMessage

编程

本文主要研究一下artemis的SessionConsumerFlowCreditMessage

SessionConsumerFlowCreditMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionConsumerFlowCreditMessage.java

public class SessionConsumerFlowCreditMessage extends PacketImpl {

private long consumerID;

private int credits;

public SessionConsumerFlowCreditMessage(final long consumerID, final int credits) {

super(SESS_FLOWTOKEN);

this.consumerID = consumerID;

this.credits = credits;

}

public SessionConsumerFlowCreditMessage() {

super(SESS_FLOWTOKEN);

}

// Public --------------------------------------------------------

public long getConsumerID() {

return consumerID;

}

public int getCredits() {

return credits;

}

@Override

public void encodeRest(final ActiveMQBuffer buffer) {

buffer.writeLong(consumerID);

buffer.writeInt(credits);

}

@Override

public void decodeRest(final ActiveMQBuffer buffer) {

consumerID = buffer.readLong();

credits = buffer.readInt();

}

@Override

public String toString() {

return getParentString() + ", consumerID=" + consumerID + ", credits=" + credits + "]";

}

@Override

public int hashCode() {

final int prime = 31;

int result = super.hashCode();

result = prime * result + (int) (consumerID ^ (consumerID >>> 32));

result = prime * result + credits;

return result;

}

@Override

public boolean equals(Object obj) {

if (this == obj)

return true;

if (!super.equals(obj))

return false;

if (!(obj instanceof SessionConsumerFlowCreditMessage))

return false;

SessionConsumerFlowCreditMessage other = (SessionConsumerFlowCreditMessage) obj;

if (consumerID != other.consumerID)

return false;

if (credits != other.credits)

return false;

return true;

}

}

  • SessionConsumerFlowCreditMessage继承了PacketImpl,其type为SESS_FLOWTOKEN

ServerSessionPacketHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java

public class ServerSessionPacketHandler implements ChannelHandler {

//......

private volatile AtomicInteger availableCredits = new AtomicInteger(0);

//......

private void onMessagePacket(final Packet packet) {

if (logger.isTraceEnabled()) {

logger.trace("ServerSessionPacketHandler::handlePacket," + packet);

}

final byte type = packet.getType();

switch (type) {

case SESS_SEND: {

onSessionSend(packet);

break;

}

case SESS_ACKNOWLEDGE: {

onSessionAcknowledge(packet);

break;

}

case SESS_PRODUCER_REQUEST_CREDITS: {

onSessionRequestProducerCredits(packet);

break;

}

case SESS_FLOWTOKEN: {

onSessionConsumerFlowCredit(packet);

break;

}

default:

// separating a method for everything else as JIT was faster this way

slowPacketHandler(packet);

break;

}

}

private void onSessionConsumerFlowCredit(Packet packet) {

this.storageManager.setContext(session.getSessionContext());

try {

Packet response = null;

boolean requiresResponse = false;

try {

SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;

session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());

} catch (ActiveMQIOErrorException e) {

response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);

} catch (ActiveMQXAException e) {

response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);

} catch (ActiveMQQueueMaxConsumerLimitReached e) {

response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);

} catch (ActiveMQException e) {

response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);

} catch (Throwable t) {

response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);

}

sendResponse(packet, response, false, false);

} finally {

this.storageManager.clearContext();

}

}

//......

}

  • onMessagePacket方法在type为SESS_FLOWTOKEN时执行onSessionConsumerFlowCredit方法;该方法执行的是session.receiveConsumerCredits以及sendResponse方法

ServerSessionImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java

public class ServerSessionImpl implements ServerSession, FailureListener {

//......

public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception {

ServerConsumer consumer = locateConsumer(consumerID);

if (consumer == null) {

logger.debug("There is no consumer with id " + consumerID);

return;

}

consumer.receiveCredits(credits);

}

//......

}

  • receiveConsumerCredits方法执行的是consumer.receiveCredits方法

ServerConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

//......

public void receiveCredits(final int credits) {

if (credits == -1) {

if (logger.isDebugEnabled()) {

logger.debug(this + ":: FlowControl::Received disable flow control message");

}

// No flow control

availableCredits = null;

// There may be messages already in the queue

promptDelivery();

} else if (credits == 0) {

// reset, used on slow consumers

logger.debug(this + ":: FlowControl::Received reset flow control message");

availableCredits.set(0);

} else {

int previous = availableCredits.getAndAdd(credits);

if (logger.isDebugEnabled()) {

logger.debug(this + "::FlowControl::Received " +

credits +

" credits, previous value = " +

previous +

" currentValue = " +

availableCredits.get());

}

if (previous <= 0 && previous + credits > 0) {

if (logger.isTraceEnabled()) {

logger.trace(this + "::calling promptDelivery from receiving credits");

}

promptDelivery();

}

}

}

public void promptDelivery() {

// largeMessageDeliverer is always set inside a lock

// if we don"t acquire a lock, we will have NPE eventually

if (largeMessageDeliverer != null) {

resumeLargeMessage();

} else {

forceDelivery();

}

}

private void forceDelivery() {

if (browseOnly) {

messageQueue.getExecutor().execute(browserDeliverer);

} else {

messageQueue.deliverAsync();

}

}

//......

}

  • receiveCredits方法在credits为-1时设置availableCredits为null,然后执行promptDelivery方法;在credits为0时设置availableCredits为0;其他情况执行availableCredits.getAndAdd(credits);promptDelivery方法主要是执行resumeLargeMessage或者forceDelivery方法

小结

SessionConsumerFlowCreditMessage继承了PacketImpl,其type为SESS_FLOWTOKEN;ServerSessionPacketHandler的onMessagePacket方法在type为SESS_FLOWTOKEN时执行onSessionConsumerFlowCredit方法;该方法执行的是session.receiveConsumerCredits以及sendResponse方法;receiveConsumerCredits方法在receiveCredits方法在credits为-1时设置availableCredits为null,然后执行promptDelivery方法;在credits为0时设置availableCredits为0;其他情况执行availableCredits.getAndAdd(credits)

doc

  • SessionConsumerFlowCreditMessage

以上是 聊聊artemis的SessionConsumerFlowCreditMessage 的全部内容, 来源链接: utcz.com/z/512802.html

回到顶部