rocketmq消息消费
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
RocketMQ 消息重试是以消费组为单位,而
不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动的时候会自动订阅该
主题,参与该主题的消息队列负载。
这些是如何实现的呢?
一个消息队列同一时间只能有一个消费者
一个消费者可以同时消费多个消息队列
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //获取一个PullRequest PullRequest pullRequest = this.pullRequestQueue.take(); //通过pullRequest拉取message this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
消息拉取分为3 个主要步骤。
1 )消息拉取客户端消息拉取请求封装。
2 ) 消息服务器查找并返回消息。
3 )消息拉取客户端处理返回的消息。
RequestCode.PULL_MESSAGE
以上是 rocketmq消息消费 的全部内容, 来源链接: utcz.com/z/515431.html