聊聊carrera的RocketMQProduceOffsetFetcher
序
本文主要研究一下carrera的RocketMQProduceOffsetFetcher
RocketMQProduceOffsetFetcher
DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/lag/offset/RocketMQProduceOffsetFetcher.java
public class RocketMQProduceOffsetFetcher { private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProduceOffsetFetcher.class);
private DefaultMQAdminExt defaultMQAdminExt;
private DefaultMQPullConsumer defaultMQPullConsumer;
private String namesrvAddr;
public RocketMQProduceOffsetFetcher(String namesrvAddr) {
this.defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setNamesrvAddr(namesrvAddr);
defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
this.defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
defaultMQPullConsumer.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));
defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
this.namesrvAddr = namesrvAddr;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void start() throws MQClientException {
defaultMQAdminExt.start();
defaultMQPullConsumer.start();
defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);
}
public void shutdown() {
defaultMQAdminExt.shutdown();
defaultMQPullConsumer.shutdown();
}
public ConsumeStats getConsumeStats(String group, String topic) throws Exception {
return defaultMQAdminExt.examineConsumeStats(group, topic);
}
public TopicStatsTable getProduceStats(String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
return defaultMQAdminExt.examineTopicStats(topic);
}
public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {
return defaultMQPullConsumer.pull(mq, "*", offset, 1);
}
}
- RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
- 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
- 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)
DefaultMQAdminExt
DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private String adminExtGroup = "admin_ext_group";
private String createTopicKey = MixAll.DEFAULT_TOPIC;
private long timeoutMillis = 5000;
//......
@Override
public ConsumeStats examineConsumeStats(String consumerGroup,
String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
}
@Override
public TopicStatsTable examineTopicStats(
String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return defaultMQAdminExtImpl.examineTopicStats(topic);
}
//......
}
- examineConsumeStats及examineTopicStats都委托给了defaultMQAdminExtImpl
DefaultMQAdminExtImpl
DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { private final Logger log = ClientLogger.getLog();
private final DefaultMQAdminExt defaultMQAdminExt;
private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mqClientInstance;
private RPCHook rpcHook;
private long timeoutMillis = 20000;
private Random random = new Random();
//......
@Override
public ConsumeStats examineConsumeStats(String consumerGroup,
String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
String queryTopic = topic == null ? MixAll.getRetryTopic(consumerGroup) : topic;
TopicRouteData topicRouteData = this.examineTopicRouteInfo(queryTopic);
ConsumeStats result = new ConsumeStats();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
ConsumeStats consumeStats =
this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);
result.getOffsetTable().putAll(consumeStats.getOffsetTable());
double value = result.getConsumeTps() + consumeStats.getConsumeTps();
result.setConsumeTps(value);
}
}
if (result.getOffsetTable().isEmpty()) {
throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
"Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
}
return result;
}
@Override
public TopicStatsTable examineTopicStats(
String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
TopicStatsTable topicStatsTable = new TopicStatsTable();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
}
}
if (topicStatsTable.getOffsetTable().isEmpty()) {
throw new MQClientException("Not found the topic stats info", null);
}
return topicStatsTable;
}
//......
}
- examineConsumeStats方法通过examineTopicRouteInfo(queryTopic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3)获取consumeStats;examineTopicStats方法也是先通过examineTopicRouteInfo(topic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis)获取topicStatsTable
小结
- RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
- 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
- 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)
doc
- RocketMQProduceOffsetFetcher
以上是 聊聊carrera的RocketMQProduceOffsetFetcher 的全部内容, 来源链接: utcz.com/z/512570.html