聊聊artemis消息的推拉模式

编程

拉模式

receive

activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java

public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscriber {

//......

@Override

public Message receive() throws JMSException {

return getMessage(0, false);

}

@Override

public Message receive(final long timeout) throws JMSException {

return getMessage(timeout, false);

}

@Override

public Message receiveNoWait() throws JMSException {

return getMessage(0, true);

}

private ActiveMQMessage getMessage(final long timeout, final boolean noWait) throws JMSException {

try {

ClientMessage coreMessage;

if (noWait) {

coreMessage = consumer.receiveImmediate();

} else {

coreMessage = consumer.receive(timeout);

}

ActiveMQMessage jmsMsg = null;

if (coreMessage != null) {

ClientSession coreSession = session.getCoreSession();

boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE ||

ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE ||

coreMessage.getType() == ActiveMQObjectMessage.TYPE;

if (coreMessage.getRoutingType() == null) {

coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST);

}

if (session.isEnable1xPrefixes()) {

jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, needSession ? coreSession : null, options);

} else {

jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? coreSession : null, options);

}

try {

jmsMsg.doBeforeReceive();

} catch (IndexOutOfBoundsException ioob) {

((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();

// In case this exception happen you will need to know where it happened.

// it has been a bug here in the past, and this was used to debug it.

// nothing better than keep it for future investigations in case it happened again

IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage());

newIOOB.initCause(ioob);

ActiveMQClientLogger.LOGGER.unableToGetMessage(newIOOB);

throw ioob;

}

// We Do the ack after doBeforeReceive, as in the case of large messages, this may fail so we don"t want messages redelivered

// https://issues.jboss.org/browse/JBPAPP-6110

if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {

jmsMsg.setIndividualAcknowledge();

} else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {

jmsMsg.setClientAcknowledge();

coreMessage.acknowledge();

} else {

coreMessage.acknowledge();

}

}

return jmsMsg;

} catch (ActiveMQException e) {

((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();

throw JMSExceptionHelper.convertFromActiveMQException(e);

} catch (ActiveMQInterruptedException e) {

((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();

throw JMSExceptionHelper.convertFromActiveMQException(e);

}

}

//......

}

  • ActiveMQMessageConsumer的receive方法最后调用的是getMessage方法,它对于session.getAcknowledgeMode()为ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE执行jmsMsg.setIndividualAcknowledge(),其余的都执行coreMessage.acknowledge()

acknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java

public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {

//......

public ClientMessageImpl acknowledge() throws ActiveMQException {

if (consumer != null) {

consumer.acknowledge(this);

}

return this;

}

//......

}

  • ClientMessageImpl的acknowledge方法执行的是consumer.acknowledge(this)

推模式

handleMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

//......

public synchronized void handleMessage(final ClientMessageInternal message) throws Exception {

if (closing) {

// This is ok - we just ignore the message

return;

}

if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {

handleCompressedMessage(message);

} else {

handleRegularMessage(message);

}

}

private void handleRegularMessage(ClientMessageInternal message) {

if (message.getAddress() == null) {

message.setAddress(queueInfo.getAddress());

}

message.onReceipt(this);

if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {

// We have messages of different priorities so we need to ack them individually since the order

// of them in the ServerConsumerImpl delivery list might not be the same as the order they are

// consumed in, which means that acking all up to won"t work

ackIndividually = true;

}

// Add it to the buffer

buffer.addTail(message, message.getPriority());

if (handler != null) {

// Execute using executor

if (!stopped) {

queueExecutor();

}

} else {

notify();

}

}

private void queueExecutor() {

if (logger.isTraceEnabled()) {

logger.trace(this + "::Adding Runner on Executor for delivery");

}

sessionExecutor.execute(runner);

}

//......

}

  • ClientConsumerImpl的handleRegularMessage会执行buffer.addTail(message, message.getPriority())以及queueExecutor(),queueExecutor通过sessionExecutor执行Runner

callOnMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

//......

private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);

//......

private class Runner implements Runnable {

@Override

public void run() {

try {

callOnMessage();

} catch (Exception e) {

ActiveMQClientLogger.LOGGER.onMessageError(e);

lastException = e;

}

}

}

private void callOnMessage() throws Exception {

if (closing || stopped) {

return;

}

session.workDone();

// We pull the message from the buffer from inside the Runnable so we can ensure priority

// ordering. If we just added a Runnable with the message to the executor immediately as we get it

// we could not do that

ClientMessageInternal message;

// Must store handler in local variable since might get set to null

// otherwise while this is executing and give NPE when calling onMessage

MessageHandler theHandler = handler;

if (theHandler != null) {

if (rateLimiter != null) {

rateLimiter.limit();

}

failedOver = false;

synchronized (this) {

message = buffer.poll();

}

if (message != null) {

if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {

//Ignore, this could be a relic from a previous receiveImmediate();

return;

}

boolean expired = message.isExpired();

flowControlBeforeConsumption(message);

if (!expired) {

if (logger.isTraceEnabled()) {

logger.trace(this + "::Calling handler.onMessage");

}

final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {

@Override

public ClassLoader run() {

ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();

Thread.currentThread().setContextClassLoader(contextClassLoader);

return originalLoader;

}

});

onMessageThread = Thread.currentThread();

try {

theHandler.onMessage(message);

} finally {

try {

AccessController.doPrivileged(new PrivilegedAction<Object>() {

@Override

public Object run() {

Thread.currentThread().setContextClassLoader(originalLoader);

return null;

}

});

} catch (Exception e) {

ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);

}

onMessageThread = null;

}

if (logger.isTraceEnabled()) {

logger.trace(this + "::Handler.onMessage done");

}

if (message.isLargeMessage()) {

message.discardBody();

}

} else {

session.expire(this, message);

}

// If slow consumer, we need to send 1 credit to make sure we get another message

if (clientWindowSize == 0) {

startSlowConsumer();

}

}

}

}

//......

}

  • Runner的run方法会执行callOnMessage方法,它会从buffer.poll()消息,如果不为null且非expired则执行theHandler.onMessage(message)

onMessage

activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java

public class JMSMessageListenerWrapper implements MessageHandler {

private final ConnectionFactoryOptions options;

private final ActiveMQConnection connection;

private final ActiveMQSession session;

private final MessageListener listener;

private final ClientConsumer consumer;

private final boolean transactedOrClientAck;

private final boolean individualACK;

private final boolean clientACK;

protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options,

final ActiveMQConnection connection,

final ActiveMQSession session,

final ClientConsumer consumer,

final MessageListener listener,

final int ackMode) {

this.options = options;

this.connection = connection;

this.session = session;

this.consumer = consumer;

this.listener = listener;

transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();

individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);

clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE);

}

/**

* In this method we apply the JMS acknowledgement and redelivery semantics

* as per JMS spec

*/

@Override

public void onMessage(final ClientMessage message) {

ActiveMQMessage msg;

if (session.isEnable1xPrefixes()) {

msg = ActiveMQCompatibleMessage.createMessage(message, session.getCoreSession(), options);

} else {

msg = ActiveMQMessage.createMessage(message, session.getCoreSession(), options);

}

if (individualACK) {

msg.setIndividualAcknowledge();

}

if (clientACK) {

msg.setClientAcknowledge();

}

try {

msg.doBeforeReceive();

} catch (Exception e) {

ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e);

return;

}

if (transactedOrClientAck) {

try {

message.acknowledge();

} catch (ActiveMQException e) {

((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();

ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);

}

}

try {

connection.getThreadAwareContext().setCurrentThread(false);

listener.onMessage(msg);

} catch (RuntimeException e) {

// See JMS 1.1 spec, section 4.5.2

ActiveMQJMSClientLogger.LOGGER.onMessageError(e);

if (!transactedOrClientAck) {

try {

if (individualACK) {

message.individualAcknowledge();

}

session.getCoreSession().rollback(true);

session.setRecoverCalled(true);

} catch (Exception e2) {

ActiveMQJMSClientLogger.LOGGER.errorRecoveringSession(e2);

}

}

} finally {

connection.getThreadAwareContext().clearCurrentThread(false);

}

if (!session.isRecoverCalled() && !individualACK) {

try {

// We don"t want to call this if the consumer was closed from inside onMessage

if (!consumer.isClosed() && !transactedOrClientAck) {

message.acknowledge();

}

} catch (ActiveMQException e) {

((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();

ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);

}

}

session.setRecoverCalled(false);

}

}

  • onMessage方法在transactedOrClientAck为true时会执行message.acknowledge();在触发listener.onMessage(msg)之后会在非session.isRecoverCalled()且非individualACK且非consumer.isClosed()且非transactedOrClientAck时执行message.acknowledge()

acknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java

public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {

//......

public ClientMessageImpl acknowledge() throws ActiveMQException {

if (consumer != null) {

consumer.acknowledge(this);

}

return this;

}

//......

}

  • acknowledge方法执行的是consumer.acknowledge(this)方法

ClientConsumerImpl.acknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

//......

public void acknowledge(final ClientMessage message) throws ActiveMQException {

ClientMessageInternal cmi = (ClientMessageInternal) message;

if (ackIndividually) {

individualAcknowledge(message);

} else {

ackBytes += message.getEncodeSize();

if (logger.isTraceEnabled()) {

logger.trace(this + "::acknowledge ackBytes=" + ackBytes + " and ackBatchSize=" + ackBatchSize + ", encodeSize=" + message.getEncodeSize());

}

if (ackBytes >= ackBatchSize) {

if (logger.isTraceEnabled()) {

logger.trace(this + ":: acknowledge acking " + cmi);

}

doAck(cmi);

} else {

if (logger.isTraceEnabled()) {

logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);

}

lastAckedMessage = cmi;

}

}

}

private void doAck(final ClientMessageInternal message) throws ActiveMQException {

ackBytes = 0;

lastAckedMessage = null;

if (logger.isTraceEnabled()) {

logger.trace(this + "::Acking message " + message);

}

session.acknowledge(this, message);

}

//......

}

  • ClientConsumerImpl的acknowledge方法执行的是doAck方法,而doAck方法执行的是session.acknowledge(this, message)

ClientSessionImpl.acknowledge

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {

//......

public void acknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {

// if we"re pre-acknowledging then we don"t need to do anything

if (preAcknowledge) {

return;

}

checkClosed();

if (logger.isDebugEnabled()) {

logger.debug("client ack messageID = " + message.getMessageID());

}

startCall();

try {

sessionContext.sendACK(false, blockOnAcknowledge, consumer, message);

} finally {

endCall();

}

}

//......

}

  • ClientSessionImpl的acknowledge方法通过sessionContext.sendACK来发送ack

小结

  • ActiveMQMessageConsumer的receive采用的是拉模式,它对于session.getAcknowledgeMode()为ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE执行jmsMsg.setIndividualAcknowledge(),其余的都执行coreMessage.acknowledge();ClientMessageImpl的acknowledge方法执行的是consumer.acknowledge(this)
  • ClientConsumerImpl的handleMessage采用的是推模式,它会执行buffer.addTail(message, message.getPriority())以及queueExecutor(),queueExecutor通过sessionExecutor执行Runner;Runner的run方法会执行callOnMessage方法,它会从buffer.poll()消息,如果不为null且非expired则执行theHandler.onMessage(message);最后触发的是执行的是consumer.acknowledge(this)方法
  • ClientConsumerImpl的acknowledge方法执行的是doAck方法,而doAck方法执行的是session.acknowledge(this, message);ClientSessionImpl的acknowledge方法通过sessionContext.sendACK来发送ack

doc

  • ActiveMQMessageConsumer
  • ClientConsumerImpl
  • ClientSessionImpl

以上是 聊聊artemis消息的推拉模式 的全部内容, 来源链接: utcz.com/z/512969.html

回到顶部