聊聊artemis的individualAcknowledge

编程

acknowledge

activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java

public class ActiveMQMessage implements javax.jms.Message {

//......

public void acknowledge() throws JMSException {

if (session != null) {

try {

if (session.isClosed()) {

throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();

}

if (individualAck) {

message.individualAcknowledge();

}

if (clientAck || individualAck) {

session.commit(session.isBlockOnAcknowledge());

}

} catch (ActiveMQException e) {

throw JMSExceptionHelper.convertFromActiveMQException(e);

}

}

}

//......

}

  • ActiveMQMessage的acknowledge方法对于individualAck为true的会单独执行message.individualAcknowledge()

message.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java

public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {

//......

public ClientMessageImpl individualAcknowledge() throws ActiveMQException {

if (consumer != null) {

consumer.individualAcknowledge(this);

}

return this;

}

//......

}

  • ClientMessageImpl的individualAcknowledge方法会执行consumer.individualAcknowledge(this)

consumer.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

//......

public void individualAcknowledge(ClientMessage message) throws ActiveMQException {

if (lastAckedMessage != null) {

flushAcks();

}

session.individualAcknowledge(this, message);

}

public void flushAcks() throws ActiveMQException {

if (lastAckedMessage != null) {

if (logger.isTraceEnabled()) {

logger.trace(this + "::FlushACK acking lastMessage::" + lastAckedMessage);

}

doAck(lastAckedMessage);

}

}

private void doAck(final ClientMessageInternal message) throws ActiveMQException {

ackBytes = 0;

lastAckedMessage = null;

if (logger.isTraceEnabled()) {

logger.trace(this + "::Acking message " + message);

}

session.acknowledge(this, message);

}

//......

}

  • ClientConsumerImpl的individualAcknowledge,对于lastAckedMessage不为null的先执行flushAcks,最后执行session.individualAcknowledge

session.individualAcknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {

//......

public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {

// if we"re pre-acknowledging then we don"t need to do anything

if (preAcknowledge) {

return;

}

checkClosed();

startCall();

try {

sessionContext.sendACK(true, blockOnAcknowledge, consumer, message);

} finally {

endCall();

}

}

//......

}

  • ClientSessionImpl的individualAcknowledge方法通过sessionContext.sendACK来发送ack

sessionContext.sendACK

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

//......

public void sendACK(boolean individual,

boolean block,

final ClientConsumer consumer,

final Message message) throws ActiveMQException {

PacketImpl messagePacket;

if (individual) {

messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);

} else {

messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block);

}

if (block) {

sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE);

} else {

sessionChannel.sendBatched(messagePacket);

}

}

//......

}

  • ActiveMQSessionContext的sendACK方法对于individual为true的创建的是SessionIndividualAcknowledgeMessage,最后通过sessionChannel.sendBlocking或者sessionChannel.sendBatched方法发送消息

ServerConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

//......

public synchronized void acknowledge(Transaction tx, final long messageID) throws Exception {

if (browseOnly) {

return;

}

// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly

// acknowledged

// We use a transaction here as if the message is not found, we should rollback anything done

// This could eventually happen on retries during transactions, and we need to make sure we don"t ACK things we are not supposed to acknowledge

boolean startedTransaction = false;

if (tx == null) {

startedTransaction = true;

tx = new TransactionImpl(storageManager);

}

try {

MessageReference ref;

do {

synchronized (lock) {

ref = deliveringRefs.poll();

}

if (logger.isTraceEnabled()) {

logger.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);

}

if (ref == null) {

ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());

tx.markAsRollbackOnly(ils);

throw ils;

}

ref.acknowledge(tx, this);

acks++;

}

while (ref.getMessageID() != messageID);

if (startedTransaction) {

tx.commit();

}

} catch (ActiveMQException e) {

if (startedTransaction) {

tx.rollback();

} else {

tx.markAsRollbackOnly(e);

}

throw e;

} catch (Throwable e) {

ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);

ActiveMQException activeMQIllegalStateException = new ActiveMQIllegalStateException(e.getMessage());

if (startedTransaction) {

tx.rollback();

} else {

tx.markAsRollbackOnly(activeMQIllegalStateException);

}

throw activeMQIllegalStateException;

}

}

public synchronized void individualAcknowledge(Transaction tx, final long messageID) throws Exception {

if (browseOnly) {

return;

}

boolean startedTransaction = false;

if (logger.isTraceEnabled()) {

logger.trace("individualACK messageID=" + messageID);

}

if (tx == null) {

if (logger.isTraceEnabled()) {

logger.trace("individualACK starting new TX");

}

startedTransaction = true;

tx = new TransactionImpl(storageManager);

}

try {

MessageReference ref;

ref = removeReferenceByID(messageID);

if (logger.isTraceEnabled()) {

logger.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);

}

if (ref == null) {

ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);

tx.markAsRollbackOnly(ils);

throw ils;

}

ref.acknowledge(tx, this);

acks++;

if (startedTransaction) {

tx.commit();

}

} catch (ActiveMQException e) {

if (startedTransaction) {

tx.rollback();

} else if (tx != null) {

tx.markAsRollbackOnly(e);

}

throw e;

} catch (Throwable e) {

ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);

ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());

if (startedTransaction) {

tx.rollback();

} else if (tx != null) {

tx.markAsRollbackOnly(hqex);

}

throw hqex;

}

}

//......

}

  • ServerConsumerImpl的individualAcknowledge方法先根据messageID将该消息从deliveringRefs中移除,然后执行ref.acknowledge(tx, this);而普通的acknowledge方法则是不断循环执行deliveringRefs.poll(),然后执行ref.acknowledge(tx, this),直到取到指定messageID的ref才跳出循环

小结

ActiveMQMessage的acknowledge方法对于individualAck为true的会单独执行message.individualAcknowledge();ClientMessageImpl的individualAcknowledge方法会执行consumer.individualAcknowledge(this);ClientConsumerImpl的individualAcknowledge,对于lastAckedMessage不为null的先执行flushAcks,最后执行session.individualAcknowledge;ClientSessionImpl的individualAcknowledge方法通过sessionContext.sendACK来发送ack;ActiveMQSessionContext的sendACK方法对于individual为true的创建的是SessionIndividualAcknowledgeMessage,最后通过sessionChannel.sendBlocking或者sessionChannel.sendBatched方法发送消息

doc

  • ClientMessageImpl

以上是 聊聊artemis的individualAcknowledge 的全部内容, 来源链接: utcz.com/z/513138.html

回到顶部