聊聊artemis的SessionProducerCreditsMessage

编程

本文主要研究一下artemis的SessionProducerCreditsMessage

SessionProducerCreditsMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionProducerCreditsMessage.java

public class SessionProducerCreditsMessage extends PacketImpl {

private int credits;

private SimpleString address;

public SessionProducerCreditsMessage(final int credits, final SimpleString address) {

super(SESS_PRODUCER_CREDITS);

this.credits = credits;

this.address = address;

}

public SessionProducerCreditsMessage() {

super(SESS_PRODUCER_CREDITS);

}

public int getCredits() {

return credits;

}

public SimpleString getAddress() {

return address;

}

@Override

public void encodeRest(final ActiveMQBuffer buffer) {

buffer.writeInt(credits);

buffer.writeSimpleString(address);

}

@Override

public void decodeRest(final ActiveMQBuffer buffer) {

credits = buffer.readInt();

address = buffer.readSimpleString();

}

@Override

public int hashCode() {

final int prime = 31;

int result = super.hashCode();

result = prime * result + ((address == null) ? 0 : address.hashCode());

result = prime * result + credits;

return result;

}

@Override

public String toString() {

StringBuffer buff = new StringBuffer(getParentString());

buff.append(", address=" + address);

buff.append(", credits=" + credits);

buff.append("]");

return buff.toString();

}

@Override

public boolean equals(Object obj) {

if (this == obj)

return true;

if (!super.equals(obj))

return false;

if (!(obj instanceof SessionProducerCreditsMessage))

return false;

SessionProducerCreditsMessage other = (SessionProducerCreditsMessage) obj;

if (address == null) {

if (other.address != null)

return false;

} else if (!address.equals(other.address))

return false;

if (credits != other.credits)

return false;

return true;

}

}

  • SessionProducerCreditsMessage继承了PacketImpl,其type为SESS_PRODUCER_CREDITS

ClientSessionPacketHandler

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

   class ClientSessionPacketHandler implements ChannelHandler {

@Override

public void handlePacket(final Packet packet) {

byte type = packet.getType();

try {

switch (type) {

case DISCONNECT_CONSUMER: {

handleConsumerDisconnected((DisconnectConsumerMessage) packet);

break;

}

case SESS_RECEIVE_CONTINUATION: {

handleReceiveContinuation((SessionReceiveContinuationMessage) packet);

break;

}

case SESS_RECEIVE_MSG: {

handleReceivedMessagePacket((SessionReceiveMessage) packet);

break;

}

case SESS_RECEIVE_LARGE_MSG: {

handleReceiveLargeMessage((SessionReceiveLargeMessage) packet);

break;

}

case PacketImpl.SESS_PRODUCER_CREDITS: {

handleReceiveProducerCredits((SessionProducerCreditsMessage) packet);

break;

}

case PacketImpl.SESS_PRODUCER_FAIL_CREDITS: {

handleReceiveProducerFailCredits((SessionProducerCreditsFailMessage) packet);

break;

}

case PacketImpl.DISCONNECT_CONSUMER_KILL: {

handleReceiveSlowConsumerKillMessage((DisconnectConsumerWithKillMessage) packet);

break;

}

case EXCEPTION: {

// We can only log these exceptions

// maybe we should cache it on SessionContext and throw an exception on any next calls

ActiveMQExceptionMessage mem = (ActiveMQExceptionMessage) packet;

ActiveMQClientLogger.LOGGER.receivedExceptionAsynchronously(mem.getException());

break;

}

default: {

throw ActiveMQClientMessageBundle.BUNDLE.invalidPacket(type);

}

}

} catch (Exception e) {

throw ActiveMQClientMessageBundle.BUNDLE.failedToHandlePacket(e);

}

sessionChannel.confirm(packet);

}

}

  • ClientSessionPacketHandler实现了ChannelHandler接口,其handlePacket根据不同的type做不同的处理,当type为SESS_PRODUCER_CREDITS时执行handleReceiveProducerCredits方法

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

//......

protected void handleReceiveProducerCredits(SessionProducerCreditsMessage message) {

handleReceiveProducerCredits(message.getAddress(), message.getCredits());

}

//......

}

  • handleReceiveProducerCredits调用的是父类SessionContext的handleReceiveProducerCredits方法

SessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java

public abstract class SessionContext {

protected ClientSessionInternal session;

protected SendAcknowledgementHandler sendAckHandler;

protected volatile RemotingConnection remotingConnection;

protected final IDGenerator idGenerator = new SimpleIDGenerator(0);

//......

protected void handleReceiveProducerCredits(SimpleString address, int credits) {

ClientSessionInternal session = this.session;

if (session != null) {

session.handleReceiveProducerCredits(address, credits);

}

}

//......

}

  • handleReceiveProducerCredits方法执行的是ClientSessionInternal的handleReceiveProducerCredits方法

ClientSessionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {

//......

@Override

public void handleReceiveProducerCredits(final SimpleString address, final int credits) {

synchronized (producerCreditManager) {

producerCreditManager.receiveCredits(address, credits);

}

}

//......

}

  • handleReceiveProducerCredits调用的是producerCreditManager.receiveCredits方法

小结

SessionProducerCreditsMessage继承了PacketImpl,其type为SESS_PRODUCER_CREDITS;ClientSessionPacketHandler实现了ChannelHandler接口,其handlePacket根据不同的type做不同的处理,当type为SESS_PRODUCER_CREDITS时执行handleReceiveProducerCredits方法;handleReceiveProducerCredits调用的是父类SessionContext的handleReceiveProducerCredits方法;SessionContext的handleReceiveProducerCredits方法执行的是ClientSessionInternal的handleReceiveProducerCredits方法;ClientSessionImpl的handleReceiveProducerCredits调用的是producerCreditManager.receiveCredits方法

doc

  • SessionProducerCreditsMessage

以上是 聊聊artemis的SessionProducerCreditsMessage 的全部内容, 来源链接: utcz.com/z/512766.html

回到顶部