聊聊rocketmq的ConsumeMode.ORDERLY

编程

本文主要研究一下rocketmq的ConsumeMode.ORDERLY

ConsumeMode.ORDERLY

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/annotation/ConsumeMode.java

public enum ConsumeMode {

/**

* Receive asynchronously delivered messages concurrently

*/

CONCURRENTLY,

/**

* Receive asynchronously delivered messages orderly. one queue, one thread

*/

ORDERLY

}

  • ConsumeMode定义了CONCURRENTLY、ORDERLY两个枚举值

DefaultRocketMQListenerContainer

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,

RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {

//......

private void initRocketMQPushConsumer() throws MQClientException {

Assert.notNull(rocketMQListener, "Property "rocketMQListener" is required");

Assert.notNull(consumerGroup, "Property "consumerGroup" is required");

Assert.notNull(nameServer, "Property "nameServer" is required");

Assert.notNull(topic, "Property "topic" is required");

RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),

this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());

boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();

if (Objects.nonNull(rpcHook)) {

consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),

enableMsgTrace, this.applicationContext.getEnvironment().

resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));

consumer.setVipChannelEnabled(false);

consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));

} else {

log.debug("Access-key or secret-key not configure in " + this + ".");

consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,

this.applicationContext.getEnvironment().

resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));

}

String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());

if (customizedNameServer != null) {

consumer.setNamesrvAddr(customizedNameServer);

} else {

consumer.setNamesrvAddr(nameServer);

}

if (accessChannel != null) {

consumer.setAccessChannel(accessChannel);

}

consumer.setConsumeThreadMax(consumeThreadMax);

if (consumeThreadMax < consumer.getConsumeThreadMin()) {

consumer.setConsumeThreadMin(consumeThreadMax);

}

consumer.setConsumeTimeout(consumeTimeout);

switch (messageModel) {

case BROADCASTING:

consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);

break;

case CLUSTERING:

consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);

break;

default:

throw new IllegalArgumentException("Property "messageModel" was wrong.");

}

switch (selectorType) {

case TAG:

consumer.subscribe(topic, selectorExpression);

break;

case SQL92:

consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));

break;

default:

throw new IllegalArgumentException("Property "selectorType" was wrong.");

}

switch (consumeMode) {

case ORDERLY:

consumer.setMessageListener(new DefaultMessageListenerOrderly());

break;

case CONCURRENTLY:

consumer.setMessageListener(new DefaultMessageListenerConcurrently());

break;

default:

throw new IllegalArgumentException("Property "consumeMode" was wrong.");

}

if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {

((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);

}

}

}

  • initRocketMQPushConsumer方法对于consumeMode为ORDERLY的设置的messageListener为DefaultMessageListenerOrderly

DefaultMessageListenerOrderly

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

@SuppressWarnings("unchecked")

@Override

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

for (MessageExt messageExt : msgs) {

log.debug("received msg: {}", messageExt);

try {

long now = System.currentTimeMillis();

rocketMQListener.onMessage(doConvertMessage(messageExt));

long costTime = System.currentTimeMillis() - now;

log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);

} catch (Exception e) {

log.warn("consume message failed. messageExt:{}", messageExt, e);

context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);

return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

}

}

return ConsumeOrderlyStatus.SUCCESS;

}

}

  • DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法遍历msgs,然后挨个调用rocketMQListener.onMessage(doConvertMessage(messageExt)),若循环中出现异常被捕获住了会执行context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

ConsumeMessageOrderlyService

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

    class ConsumeRequest implements Runnable {

private final ProcessQueue processQueue;

private final MessageQueue messageQueue;

public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {

this.processQueue = processQueue;

this.messageQueue = messageQueue;

}

public ProcessQueue getProcessQueue() {

return processQueue;

}

public MessageQueue getMessageQueue() {

return messageQueue;

}

@Override

public void run() {

if (this.processQueue.isDropped()) {

log.warn("run, the message queue not be able to consume, because it"s dropped. {}", this.messageQueue);

return;

}

final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);

synchronized (objLock) {

if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {

final long beginTime = System.currentTimeMillis();

for (boolean continueConsume = true; continueConsume; ) {

if (this.processQueue.isDropped()) {

log.warn("the message queue not be able to consume, because it"s dropped. {}", this.messageQueue);

break;

}

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

&& !this.processQueue.isLocked()) {

log.warn("the message queue not locked, so consume later, {}", this.messageQueue);

ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);

break;

}

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())

&& this.processQueue.isLockExpired()) {

log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);

ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);

break;

}

long interval = System.currentTimeMillis() - beginTime;

if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {

ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);

break;

}

final int consumeBatchSize =

ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);

defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

if (!msgs.isEmpty()) {

final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;

if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext = new ConsumeMessageContext();

consumeMessageContext

.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());

consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());

consumeMessageContext.setMq(messageQueue);

consumeMessageContext.setMsgList(msgs);

consumeMessageContext.setSuccess(false);

// init the consume context type

consumeMessageContext.setProps(new HashMap<String, String>());

ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);

}

long beginTimestamp = System.currentTimeMillis();

ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

boolean hasException = false;

try {

this.processQueue.getLockConsume().lock();

if (this.processQueue.isDropped()) {

log.warn("consumeMessage, the message queue not be able to consume, because it"s dropped. {}",

this.messageQueue);

break;

}

status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);

} catch (Throwable e) {

log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",

RemotingHelper.exceptionSimpleDesc(e),

ConsumeMessageOrderlyService.this.consumerGroup,

msgs,

messageQueue);

hasException = true;

} finally {

this.processQueue.getLockConsume().unlock();

}

if (null == status

|| ConsumeOrderlyStatus.ROLLBACK == status

|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {

log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",

ConsumeMessageOrderlyService.this.consumerGroup,

msgs,

messageQueue);

}

long consumeRT = System.currentTimeMillis() - beginTimestamp;

if (null == status) {

if (hasException) {

returnType = ConsumeReturnType.EXCEPTION;

} else {

returnType = ConsumeReturnType.RETURNNULL;

}

} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {

returnType = ConsumeReturnType.TIME_OUT;

} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {

returnType = ConsumeReturnType.FAILED;

} else if (ConsumeOrderlyStatus.SUCCESS == status) {

returnType = ConsumeReturnType.SUCCESS;

}

if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());

}

if (null == status) {

status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

}

if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext.setStatus(status.toString());

consumeMessageContext

.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);

ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);

}

ConsumeMessageOrderlyService.this.getConsumerStatsManager()

.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);

} else {

continueConsume = false;

}

}

} else {

if (this.processQueue.isDropped()) {

log.warn("the message queue not be able to consume, because it"s dropped. {}", this.messageQueue);

return;

}

ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);

}

}

}

}

  • ConsumeRequest实现了Runnable接口,它的构造器要求传入processQueue、messageQueue参数;其run

    方法首先通过messageQueueLock.fetchLockObject(this.messageQueue)获取objLock,之后synchronized该objLock进行后续操作

  • 对于messageModel非MessageModel.BROADCASTING的且(this.processQueue.isLocked() && !this.processQueue.isLockExpired())不成立的则执行tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100)
  • 之后通过processQueue.takeMessags(consumeBatchSize),然后执行processQueue.getLockConsume().lock(),再执行messageListener.consumeMessage(Collections.unmodifiableList(msgs), context),最后在finally执行processQueue.getLockConsume().unlock(),之后通过ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this)处理ConsumeOrderlyStatus

ProcessQueue

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

public class ProcessQueue {

//......

public List<MessageExt> takeMessags(final int batchSize) {

List<MessageExt> result = new ArrayList<MessageExt>(batchSize);

final long now = System.currentTimeMillis();

try {

this.lockTreeMap.writeLock().lockInterruptibly();

this.lastConsumeTimestamp = now;

try {

if (!this.msgTreeMap.isEmpty()) {

for (int i = 0; i < batchSize; i++) {

Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();

if (entry != null) {

result.add(entry.getValue());

consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());

} else {

break;

}

}

}

if (result.isEmpty()) {

consuming = false;

}

} finally {

this.lockTreeMap.writeLock().unlock();

}

} catch (InterruptedException e) {

log.error("take Messages exception", e);

}

return result;

}

//......

}

  • takeMessags方法先执行lockTreeMap.writeLock().lockInterruptibly(),然后执行msgTreeMap.pollFirstEntry()及consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());最后finally执行lockTreeMap.writeLock().unlock()

小结

DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法遍历msgs,然后挨个调用rocketMQListener.onMessage(doConvertMessage(messageExt)),若循环中出现异常被捕获住了会执行context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

doc

  • ConsumeMessageOrderlyService

以上是 聊聊rocketmq的ConsumeMode.ORDERLY 的全部内容, 来源链接: utcz.com/z/511891.html

回到顶部