聊聊artemis的confirmationWindowEnabled

编程

confirmationWindowEnabled

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java

public class ClientProducerImpl implements ClientProducerInternal {

//......

public void send(SimpleString address1,

Message message,

SendAcknowledgementHandler handler) throws ActiveMQException {

checkClosed();

boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled();

if (confirmationWindowEnabled) {

doSend(address1, message, handler);

} else {

doSend(address1, message, null);

if (handler != null) {

if (logger.isDebugEnabled()) {

logger.debug("Handler was used on producing messages towards address " + (address1 == null ? null : address1.toString()) + " however there is no confirmationWindowEnabled");

}

if (!confirmationNotSetLogged) {

// will log thisonly once

ActiveMQClientLogger.LOGGER.confirmationNotSet();

}

// if there is no confirmation enabled, we will at least call the handler after the sent is done

session.scheduleConfirmation(handler, message);

}

}

}

//......

}

  • ClientProducerImpl的send方法,在confirmationWindowEnabled为true时执行的是doSend(address1, message, handler);否则执行doSend(address1, message, null),并在handler不为null时执行session.scheduleConfirmation(handler, message)

doSend

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java

public class ClientProducerImpl implements ClientProducerInternal {

//......

private void doSend(SimpleString sendingAddress,

final Message msgToSend,

final SendAcknowledgementHandler handler) throws ActiveMQException {

if (sendingAddress == null) {

sendingAddress = this.address;

}

session.startCall();

try {

// In case we received message from another protocol, we first need to convert it to core as the ClientProducer only understands core

ICoreMessage msg = msgToSend.toCore();

ClientProducerCredits theCredits;

boolean isLarge;

// a note about the second check on the writerIndexSize,

// If it"s a server"s message, it means this is being done through the bridge or some special consumer on the

// server"s on which case we can"t" convert the message into large at the servers

if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msg) != null || msg.isLargeMessage() ||

msg.getBodyBuffer().writerIndex() > minLargeMessageSize)) {

isLarge = true;

} else {

isLarge = false;

}

if (!isLarge) {

session.setAddress(msg, sendingAddress);

} else {

msg.setAddress(sendingAddress);

}

// Anonymous

theCredits = session.getCredits(sendingAddress, true);

if (rateLimiter != null) {

// Rate flow control

rateLimiter.limit();

}

if (groupID != null) {

msg.putStringProperty(Message.HDR_GROUP_ID, groupID);

}

final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;

// if Handler != null, we will send non blocking

final boolean sendBlocking = sendBlockingConfig && handler == null;

session.workDone();

if (isLarge) {

largeMessageSend(sendBlocking, msg, theCredits, handler);

} else {

sendRegularMessage(sendingAddress, msg, sendBlocking, theCredits, handler);

}

} finally {

session.endCall();

}

}

private void sendRegularMessage(final SimpleString sendingAddress,

final ICoreMessage msgI,

final boolean sendBlocking,

final ClientProducerCredits theCredits,

final SendAcknowledgementHandler handler) throws ActiveMQException {

// This will block if credits are not available

// Note, that for a large message, the encode size only includes the properties + headers

// Not the continuations, but this is ok since we are only interested in limiting the amount of

// data in *memory* and continuations go straight to the disk

logger.tracef("sendRegularMessage::%s, Blocking=%s", msgI, sendBlocking);

int creditSize = sessionContext.getCreditsOnSendingFull(msgI);

theCredits.acquireCredits(creditSize);

sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);

}

//......

}

  • doSend方法最后执行的是largeMessageSend或者sendRegularMessage方法,而sendRegularMessage方法最后执行的是sessionContext.sendFullMessage(msgI, sendBlocking, handler, address)

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

//......

public void sendFullMessage(ICoreMessage msgI,

boolean sendBlocking,

SendAcknowledgementHandler handler,

SimpleString defaultAddress) throws ActiveMQException {

final SessionSendMessage packet;

if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {

packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);

} else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {

packet = new SessionSendMessage(msgI, sendBlocking, handler);

} else {

boolean responseRequired = confirmationWindow != -1 || sendBlocking;

packet = new SessionSendMessage_V2(msgI, responseRequired, handler);

}

if (sendBlocking) {

sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);

} else {

sessionChannel.sendBatched(packet);

}

}

//......

}

  • sendFullMessage方法创建的是SessionSendMessage,其SendAcknowledgementHandler参数也通过SessionSendMessage的构造器传递给了SessionSendMessage

SessionSendMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java

public class SessionSendMessage extends MessagePacket {

protected boolean requiresResponse;

private final transient SendAcknowledgementHandler handler;

/** This will be using the CoreMessage because it is meant for the core-protocol */

protected SessionSendMessage(final byte id,

final ICoreMessage message,

final boolean requiresResponse,

final SendAcknowledgementHandler handler) {

super(id, message);

this.handler = handler;

this.requiresResponse = requiresResponse;

}

protected SessionSendMessage(final byte id,

final CoreMessage message) {

super(id, message);

this.handler = null;

}

/** This will be using the CoreMessage because it is meant for the core-protocol */

public SessionSendMessage(final ICoreMessage message,

final boolean requiresResponse,

final SendAcknowledgementHandler handler) {

super(SESS_SEND, message);

this.handler = handler;

this.requiresResponse = requiresResponse;

}

public SessionSendMessage(final CoreMessage message) {

super(SESS_SEND, message);

this.handler = null;

}

// Public --------------------------------------------------------

@Override

public boolean isRequiresResponse() {

return requiresResponse;

}

public SendAcknowledgementHandler getHandler() {

return handler;

}

//......

}

  • SessionSendMessage继承了MessagePacket;getHandler方法可以获取SendAcknowledgementHandler

scheduleConfirmation

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {

//......

public void scheduleConfirmation(final SendAcknowledgementHandler handler, final Message message) {

executor.execute(new Runnable() {

@Override

public void run() {

handler.sendAcknowledged(message);

}

});

}

//......

}

  • scheduleConfirmation方法会往线程池提交一个Runnable,该run方法执行handler.sendAcknowledged(message)

SendAcknowledgementHandler

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java

public interface SendAcknowledgementHandler {

/**

* Notifies the client that a message sent asynchronously has been received by the server.

*

* @param message message sent asynchronously

*/

void sendAcknowledged(Message message);

default void sendFailed(Message message, Exception e) {

/**

* By default ignore failures to preserve compatibility with existing implementations.

* If the message makes it to the broker and a failure occurs sendAcknowledge() will

* still be invoked just like it always was.

*/

}

}

  • SendAcknowledgementHandler接口定义了sendAcknowledged及sendFailed方法

ServerSessionPacketHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java

public class ServerSessionPacketHandler implements ChannelHandler {

//......

private void sendResponse(final Packet confirmPacket,

final Packet response,

final boolean flush,

final boolean closeChannel) {

if (logger.isTraceEnabled()) {

logger.trace("ServerSessionPacketHandler::scheduling response::" + response);

}

storageManager.afterCompleteOperations(new IOCallback() {

@Override

public void onError(final int errorCode, final String errorMessage) {

ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);

Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage));

doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel);

if (logger.isTraceEnabled()) {

logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket);

}

}

@Override

public void done() {

if (logger.isTraceEnabled()) {

logger.trace("ServerSessionPacketHandler::regular response sent::" + response);

}

doConfirmAndResponse(confirmPacket, response, flush, closeChannel);

}

});

}

private void doConfirmAndResponse(final Packet confirmPacket,

final Packet response,

final boolean flush,

final boolean closeChannel) {

// don"t confirm if the response is an exception

if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) {

channel.confirm(confirmPacket);

if (flush) {

channel.flushConfirmations();

}

}

if (response != null) {

channel.send(response);

}

if (closeChannel) {

channel.close();

}

}

//......

}

  • ServerSessionPacketHandler的sendResponse方法通过storageManager.afterCompleteOperations注册了IOCallback,在onError以及done方法里头都执行了doConfirmAndResponse方法;doConfirmAndResponse方法在response不是exception的时候会执行channel.confirm(confirmPacket)

confirm

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java

public final class ChannelImpl implements Channel {

//......

public void confirm(final Packet packet) {

if (resendCache != null && packet.isRequiresConfirmations()) {

lastConfirmedCommandID.incrementAndGet();

if (logger.isTraceEnabled()) {

logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);

}

receivedBytes += packet.getPacketSize();

if (receivedBytes >= confWindowSize) {

receivedBytes = 0;

final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID.get());

confirmed.setChannelID(id);

doWrite(confirmed);

}

}

}

//......

}

  • ChannelImpl的confirm方法写入的是PacketsConfirmedMessage

PacketsConfirmedMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/PacketsConfirmedMessage.java

public class PacketsConfirmedMessage extends PacketImpl {

private int commandID;

public PacketsConfirmedMessage(final int commandID) {

super(PACKETS_CONFIRMED);

this.commandID = commandID;

}

public PacketsConfirmedMessage() {

super(PACKETS_CONFIRMED);

}

//......

}

  • PacketsConfirmedMessage继承了PacketImpl,其type为PACKETS_CONFIRMED

handlePacket

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java

public final class ChannelImpl implements Channel {

//......

public void handlePacket(final Packet packet) {

if (packet.getType() == PacketImpl.PACKETS_CONFIRMED) {

if (resendCache != null) {

final PacketsConfirmedMessage msg = (PacketsConfirmedMessage) packet;

clearUpTo(msg.getCommandID());

}

if (!connection.isClient() && handler != null) {

handler.handlePacket(packet);

}

return;

} else {

if (packet.isResponse()) {

confirm(packet);

handleAsyncResponse(packet);

lock.lock();

try {

response = packet;

sendCondition.signal();

} finally {

lock.unlock();

}

} else if (handler != null) {

handler.handlePacket(packet);

}

}

}

private void clearUpTo(final int lastReceivedCommandID) {

final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;

if (logger.isTraceEnabled()) {

logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID + " first commandID=" + firstStoredCommandID + " number to clear " + numberToClear);

}

for (int i = 0; i < numberToClear; i++) {

final Packet packet = resendCache.poll();

if (packet == null) {

ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);

firstStoredCommandID = lastReceivedCommandID + 1;

return;

}

if (logger.isTraceEnabled()) {

logger.trace("RemotingConnectionID=" + connection.getID() + " ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);

}

if (commandConfirmationHandler != null) {

commandConfirmationHandler.commandConfirmed(packet);

}

if (responseAsyncCache != null) {

responseAsyncCache.handleResponse(packet);

}

}

firstStoredCommandID += numberToClear;

}

//......

}

  • ChannelImpl的handlePacket方法在packet的type为PacketImpl.PACKETS_CONFIRMED,且resendCache不为null时会执行clearUpTo方法;clearUpTo方法在commandConfirmationHandler不为null时会执行commandConfirmationHandler.commandConfirmed(packet),在responseAsyncCache不为null时会执行responseAsyncCache.handleResponse(packet),二者最后都会执行responseHandler.handleResponse(packet, response)方法

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

//......

private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {

@Override

public void commandConfirmed(Packet packet) {

responseHandler.handleResponse(packet, null);

}

};

private final ResponseHandler responseHandler = new ResponseHandler() {

@Override

public void handleResponse(Packet packet, Packet response) {

final ActiveMQException activeMQException;

if (response != null && response.getType() == PacketImpl.EXCEPTION) {

ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;

activeMQException = exceptionResponseMessage.getException();

} else {

activeMQException = null;

}

if (packet.getType() == PacketImpl.SESS_SEND) {

SessionSendMessage ssm = (SessionSendMessage) packet;

callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException);

} else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {

SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;

if (!scm.isContinues()) {

callSendAck(scm.getHandler(), scm.getMessage(), activeMQException);

}

}

}

private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) {

if (handler != null) {

if (exception == null) {

handler.sendAcknowledged(message);

} else {

handler.sendFailed(message, exception);

}

} else if (sendAckHandler != null) {

if (exception == null) {

sendAckHandler.sendAcknowledged(message);

} else {

sendAckHandler.sendFailed(message, exception);

}

}

}

};

//......

}

  • CommandConfirmationHandler的commandConfirmed会执行ResponseHandler的handleResponse方法;handleResponse方法会执行callSendAck,而callSendAck执行的是SendAcknowledgementHandler(ssm.getHandler())的sendAcknowledged或者sendFailed方法

小结

  • ClientProducerImpl的send方法,在confirmationWindowEnabled为true时执行的是doSend(address1, message, handler);否则执行doSend(address1, message, null),并在handler不为null时执行session.scheduleConfirmation(handler, message)
  • doSend方法最后执行的是largeMessageSend或者sendRegularMessage方法,而sendRegularMessage方法最后执行的是sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);sendFullMessage方法创建的是SessionSendMessage,其SendAcknowledgementHandler参数也通过SessionSendMessage的构造器传递给了SessionSendMessage
  • ServerSessionPacketHandler的sendResponse方法通过storageManager.afterCompleteOperations注册了IOCallback,在onError以及done方法里头都执行了doConfirmAndResponse方法;doConfirmAndResponse方法在response不是exception的时候会执行channel.confirm(confirmPacket);ChannelImpl的confirm方法写入的是PacketsConfirmedMessage;PacketsConfirmedMessage继承了PacketImpl,其type为PACKETS_CONFIRMED

ChannelImpl的handlePacket方法在packet的type为PacketImpl.PACKETS_CONFIRMED,且resendCache不为null时会执行clearUpTo方法;最后都会执行responseHandler.handleResponse(packet, response)方法,handleResponse方法会执行callSendAck,而callSendAck执行的是SendAcknowledgementHandler(ssm.getHandler())的sendAcknowledged或者sendFailed方法

doc

  • ClientProducerImpl
  • SessionSendMessage
  • PacketsConfirmedMessage

以上是 聊聊artemis的confirmationWindowEnabled 的全部内容, 来源链接: utcz.com/z/512991.html

回到顶部