聊聊rocketmq的consumeThread

编程

本文主要研究一下rocketmq的consumeThread

DefaultMQPushConsumer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

private final InternalLogger log = ClientLogger.getLog();

//......

/**

* Minimum consumer thread number

*/

private int consumeThreadMin = 20;

/**

* Max consumer thread number

*/

private int consumeThreadMax = 20;

public int getConsumeThreadMax() {

return consumeThreadMax;

}

public int getConsumeThreadMin() {

return consumeThreadMin;

}

//......

}

  • DefaultMQPushConsumer定义了consumeThreadMin、consumeThreadMax属性,默认均为20

DefaultMQPushConsumerImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

private void checkConfig() throws MQClientException {

Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

//......

// consumeThreadMin

if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1

|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {

throw new MQClientException(

"consumeThreadMin Out of range [1, 1000]"

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),

null);

}

// consumeThreadMax

if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {

throw new MQClientException(

"consumeThreadMax Out of range [1, 1000]"

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),

null);

}

// consumeThreadMin can"t be larger than consumeThreadMax

if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {

throw new MQClientException(

"consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "

+ "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",

null);

}

//......

}

//......

}

  • checkConfig方法会对consumeThreadMin、consumeThreadMax参数进行校验,其中他们的范围必须大于1小于1000,且consumeThreadMin不能大于consumeThreadMax

ConsumeMessageConcurrentlyService

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

private static final InternalLogger log = ClientLogger.getLog();

private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

private final DefaultMQPushConsumer defaultMQPushConsumer;

private final MessageListenerConcurrently messageListener;

private final BlockingQueue<Runnable> consumeRequestQueue;

private final ThreadPoolExecutor consumeExecutor;

private final String consumerGroup;

private final ScheduledExecutorService scheduledExecutorService;

private final ScheduledExecutorService cleanExpireMsgExecutors;

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,

MessageListenerConcurrently messageListener) {

this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;

this.messageListener = messageListener;

this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();

this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

this.consumeExecutor = new ThreadPoolExecutor(

this.defaultMQPushConsumer.getConsumeThreadMin(),

this.defaultMQPushConsumer.getConsumeThreadMax(),

1000 * 60,

TimeUnit.MILLISECONDS,

this.consumeRequestQueue,

new ThreadFactoryImpl("ConsumeMessageThread_"));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));

this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));

}

//......

public void submitConsumeRequest(

final List<MessageExt> msgs,

final ProcessQueue processQueue,

final MessageQueue messageQueue,

final boolean dispatchToConsume) {

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

if (msgs.size() <= consumeBatchSize) {

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

this.submitConsumeRequestLater(consumeRequest);

}

} else {

for (int total = 0; total < msgs.size(); ) {

List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);

for (int i = 0; i < consumeBatchSize; i++, total++) {

if (total < msgs.size()) {

msgThis.add(msgs.get(total));

} else {

break;

}

}

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);

try {

this.consumeExecutor.submit(consumeRequest);

} catch (RejectedExecutionException e) {

for (; total < msgs.size(); total++) {

msgThis.add(msgs.get(total));

}

this.submitConsumeRequestLater(consumeRequest);

}

}

}

}

private void submitConsumeRequestLater(final ConsumeRequest consumeRequest

) {

this.scheduledExecutorService.schedule(new Runnable() {

@Override

public void run() {

ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);

}

}, 5000, TimeUnit.MILLISECONDS);

}

//......

}

  • ConsumeMessageConcurrentlyService的构造器创建了consumeExecutor,其corePoolSize为defaultMQPushConsumer.getConsumeThreadMin(),其maximumPoolSize为defaultMQPushConsumer.getConsumeThreadMax(),其workQueue为LinkedBlockingQueue;其submitConsumeRequest及submitConsumeRequestLater方法均会往consumeExecutor提交consumeRequest

ConsumeRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

    class ConsumeRequest implements Runnable {

private final List<MessageExt> msgs;

private final ProcessQueue processQueue;

private final MessageQueue messageQueue;

public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {

this.msgs = msgs;

this.processQueue = processQueue;

this.messageQueue = messageQueue;

}

public List<MessageExt> getMsgs() {

return msgs;

}

public ProcessQueue getProcessQueue() {

return processQueue;

}

@Override

public void run() {

if (this.processQueue.isDropped()) {

log.info("the message queue not be able to consume, because it"s dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);

return;

}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;

ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);

ConsumeConcurrentlyStatus status = null;

defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

ConsumeMessageContext consumeMessageContext = null;

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext = new ConsumeMessageContext();

consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());

consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());

consumeMessageContext.setProps(new HashMap<String, String>());

consumeMessageContext.setMq(messageQueue);

consumeMessageContext.setMsgList(msgs);

consumeMessageContext.setSuccess(false);

ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);

}

long beginTimestamp = System.currentTimeMillis();

boolean hasException = false;

ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

try {

if (msgs != null && !msgs.isEmpty()) {

for (MessageExt msg : msgs) {

MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));

}

}

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

} catch (Throwable e) {

log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",

RemotingHelper.exceptionSimpleDesc(e),

ConsumeMessageConcurrentlyService.this.consumerGroup,

msgs,

messageQueue);

hasException = true;

}

long consumeRT = System.currentTimeMillis() - beginTimestamp;

if (null == status) {

if (hasException) {

returnType = ConsumeReturnType.EXCEPTION;

} else {

returnType = ConsumeReturnType.RETURNNULL;

}

} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {

returnType = ConsumeReturnType.TIME_OUT;

} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {

returnType = ConsumeReturnType.FAILED;

} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {

returnType = ConsumeReturnType.SUCCESS;

}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());

}

if (null == status) {

log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",

ConsumeMessageConcurrentlyService.this.consumerGroup,

msgs,

messageQueue);

status = ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext.setStatus(status.toString());

consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);

ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);

}

ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()

.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

if (!processQueue.isDropped()) {

ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

} else {

log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);

}

}

public MessageQueue getMessageQueue() {

return messageQueue;

}

}

  • ConsumeRequest实现了Runnable方法,其run方法先执行defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext),然后执行listener.consumeMessage(Collections.unmodifiableList(msgs), context),最后执行processConsumeResult(status, context, this)

小结

ConsumeMessageConcurrentlyService的构造器创建了consumeExecutor,其corePoolSize为defaultMQPushConsumer.getConsumeThreadMin(),其maximumPoolSize为defaultMQPushConsumer.getConsumeThreadMax(),其workQueue为LinkedBlockingQueue;其submitConsumeRequest及submitConsumeRequestLater方法均会往consumeExecutor提交consumeRequest

doc

  • DefaultMQPushConsumer

以上是 聊聊rocketmq的consumeThread 的全部内容, 来源链接: utcz.com/z/510847.html

回到顶部