rocketmq消息消费

编程

 

log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

RocketMQ 消息重试是以消费组为单位,而

不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动的时候会自动订阅该

主题,参与该主题的消息队列负载。

 

这些是如何实现的呢?

一个消息队列同一时间只能有一个消费者

一个消费者可以同时消费多个消息队列

@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //获取一个PullRequest PullRequest pullRequest = this.pullRequestQueue.take(); //通过pullRequest拉取message this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }

 

 

 

private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }

 

 

 

消息拉取分为3 个主要步骤。

1 )消息拉取客户端消息拉取请求封装。

2 ) 消息服务器查找并返回消息。

3 )消息拉取客户端处理返回的消息。

 

 

RequestCode.PULL_MESSAGE

 

 

 

 

 

以上是 rocketmq消息消费 的全部内容, 来源链接: utcz.com/z/515431.html

回到顶部