聊聊artemisClientConsumer的handleRegularMessage
handleRegularMessage
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
public final class ClientConsumerImpl implements ClientConsumerInternal { //......
private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);
private final Runner runner = new Runner();
private volatile MessageHandler handler;
//......
private void handleRegularMessage(ClientMessageInternal message) {
if (message.getAddress() == null) {
message.setAddress(queueInfo.getAddress());
}
message.onReceipt(this);
if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
// We have messages of different priorities so we need to ack them individually since the order
// of them in the ServerConsumerImpl delivery list might not be the same as the order they are
// consumed in, which means that acking all up to won"t work
ackIndividually = true;
}
// Add it to the buffer
buffer.addTail(message, message.getPriority());
if (handler != null) {
// Execute using executor
if (!stopped) {
queueExecutor();
}
} else {
notify();
}
}
private void queueExecutor() {
if (logger.isTraceEnabled()) {
logger.trace(this + "::Adding Runner on Executor for delivery");
}
sessionExecutor.execute(runner);
}
//......
}
- ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner
Runner
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
public final class ClientConsumerImpl implements ClientConsumerInternal { //......
private class Runner implements Runnable {
@Override
public void run() {
try {
callOnMessage();
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.onMessageError(e);
lastException = e;
}
}
}
private void callOnMessage() throws Exception {
if (closing || stopped) {
return;
}
session.workDone();
// We pull the message from the buffer from inside the Runnable so we can ensure priority
// ordering. If we just added a Runnable with the message to the executor immediately as we get it
// we could not do that
ClientMessageInternal message;
// Must store handler in local variable since might get set to null
// otherwise while this is executing and give NPE when calling onMessage
MessageHandler theHandler = handler;
if (theHandler != null) {
if (rateLimiter != null) {
rateLimiter.limit();
}
failedOver = false;
synchronized (this) {
message = buffer.poll();
}
if (message != null) {
if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
//Ignore, this could be a relic from a previous receiveImmediate();
return;
}
boolean expired = message.isExpired();
flowControlBeforeConsumption(message);
if (!expired) {
if (logger.isTraceEnabled()) {
logger.trace(this + "::Calling handler.onMessage");
}
final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
@Override
public ClassLoader run() {
ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(contextClassLoader);
return originalLoader;
}
});
onMessageThread = Thread.currentThread();
try {
theHandler.onMessage(message);
} finally {
try {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
Thread.currentThread().setContextClassLoader(originalLoader);
return null;
}
});
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);
}
onMessageThread = null;
}
if (logger.isTraceEnabled()) {
logger.trace(this + "::Handler.onMessage done");
}
if (message.isLargeMessage()) {
message.discardBody();
}
} else {
session.expire(this, message);
}
// If slow consumer, we need to send 1 credit to make sure we get another message
if (clientWindowSize == 0) {
startSlowConsumer();
}
}
}
}
private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {
// Chunk messages will execute the flow control while receiving the chunks
if (message.getFlowControlSize() != 0) {
// on large messages we should discount 1 on the first packets as we need continuity until the last packet
flowControl(message.getFlowControlSize(), !message.isLargeMessage());
}
}
public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException {
if (clientWindowSize >= 0) {
creditsToSend += messageBytes;
if (creditsToSend >= clientWindowSize) {
if (clientWindowSize == 0 && discountSlowConsumer) {
if (logger.isTraceEnabled()) {
logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
}
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
// always buffering one after received the first message
final int credits = creditsToSend - 1;
creditsToSend = 0;
if (credits > 0) {
sendCredits(credits);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Sending " + messageBytes + " from flow-control");
}
final int credits = creditsToSend;
creditsToSend = 0;
if (credits > 0) {
sendCredits(credits);
}
}
}
}
}
//......
}
- Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal,若不为null,则执行flowControlBeforeConsumption(message),对于非expired的会执行theHandler.onMessage(message)方法;对于clientWindowSize为0的则执行startSlowConsumer();flowControlBeforeConsumption方法会执行flowControl方法,该方法会计算credits,然后执行sendCredits(credits)
小结
ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner;Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal进行处理
doc
- ClientConsumerImpl
以上是 聊聊artemisClientConsumer的handleRegularMessage 的全部内容, 来源链接: utcz.com/z/512851.html