聊聊artemisClientConsumer的handleRegularMessage

编程

handleRegularMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

//......

private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);

private final Runner runner = new Runner();

private volatile MessageHandler handler;

//......

private void handleRegularMessage(ClientMessageInternal message) {

if (message.getAddress() == null) {

message.setAddress(queueInfo.getAddress());

}

message.onReceipt(this);

if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {

// We have messages of different priorities so we need to ack them individually since the order

// of them in the ServerConsumerImpl delivery list might not be the same as the order they are

// consumed in, which means that acking all up to won"t work

ackIndividually = true;

}

// Add it to the buffer

buffer.addTail(message, message.getPriority());

if (handler != null) {

// Execute using executor

if (!stopped) {

queueExecutor();

}

} else {

notify();

}

}

private void queueExecutor() {

if (logger.isTraceEnabled()) {

logger.trace(this + "::Adding Runner on Executor for delivery");

}

sessionExecutor.execute(runner);

}

//......

}

  • ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner

Runner

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

//......

private class Runner implements Runnable {

@Override

public void run() {

try {

callOnMessage();

} catch (Exception e) {

ActiveMQClientLogger.LOGGER.onMessageError(e);

lastException = e;

}

}

}

private void callOnMessage() throws Exception {

if (closing || stopped) {

return;

}

session.workDone();

// We pull the message from the buffer from inside the Runnable so we can ensure priority

// ordering. If we just added a Runnable with the message to the executor immediately as we get it

// we could not do that

ClientMessageInternal message;

// Must store handler in local variable since might get set to null

// otherwise while this is executing and give NPE when calling onMessage

MessageHandler theHandler = handler;

if (theHandler != null) {

if (rateLimiter != null) {

rateLimiter.limit();

}

failedOver = false;

synchronized (this) {

message = buffer.poll();

}

if (message != null) {

if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {

//Ignore, this could be a relic from a previous receiveImmediate();

return;

}

boolean expired = message.isExpired();

flowControlBeforeConsumption(message);

if (!expired) {

if (logger.isTraceEnabled()) {

logger.trace(this + "::Calling handler.onMessage");

}

final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {

@Override

public ClassLoader run() {

ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();

Thread.currentThread().setContextClassLoader(contextClassLoader);

return originalLoader;

}

});

onMessageThread = Thread.currentThread();

try {

theHandler.onMessage(message);

} finally {

try {

AccessController.doPrivileged(new PrivilegedAction<Object>() {

@Override

public Object run() {

Thread.currentThread().setContextClassLoader(originalLoader);

return null;

}

});

} catch (Exception e) {

ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);

}

onMessageThread = null;

}

if (logger.isTraceEnabled()) {

logger.trace(this + "::Handler.onMessage done");

}

if (message.isLargeMessage()) {

message.discardBody();

}

} else {

session.expire(this, message);

}

// If slow consumer, we need to send 1 credit to make sure we get another message

if (clientWindowSize == 0) {

startSlowConsumer();

}

}

}

}

private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {

// Chunk messages will execute the flow control while receiving the chunks

if (message.getFlowControlSize() != 0) {

// on large messages we should discount 1 on the first packets as we need continuity until the last packet

flowControl(message.getFlowControlSize(), !message.isLargeMessage());

}

}

public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException {

if (clientWindowSize >= 0) {

creditsToSend += messageBytes;

if (creditsToSend >= clientWindowSize) {

if (clientWindowSize == 0 && discountSlowConsumer) {

if (logger.isTraceEnabled()) {

logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");

}

// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be

// always buffering one after received the first message

final int credits = creditsToSend - 1;

creditsToSend = 0;

if (credits > 0) {

sendCredits(credits);

}

} else {

if (logger.isDebugEnabled()) {

logger.debug("Sending " + messageBytes + " from flow-control");

}

final int credits = creditsToSend;

creditsToSend = 0;

if (credits > 0) {

sendCredits(credits);

}

}

}

}

}

//......

}

  • Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal,若不为null,则执行flowControlBeforeConsumption(message),对于非expired的会执行theHandler.onMessage(message)方法;对于clientWindowSize为0的则执行startSlowConsumer();flowControlBeforeConsumption方法会执行flowControl方法,该方法会计算credits,然后执行sendCredits(credits)

小结

ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner;Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal进行处理

doc

  • ClientConsumerImpl

以上是 聊聊artemisClientConsumer的handleRegularMessage 的全部内容, 来源链接: utcz.com/z/512851.html

回到顶部