聊聊carrera的RocketMQProduceOffsetFetcher

编程

本文主要研究一下carrera的RocketMQProduceOffsetFetcher

RocketMQProduceOffsetFetcher

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/lag/offset/RocketMQProduceOffsetFetcher.java

public class RocketMQProduceOffsetFetcher {

private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProduceOffsetFetcher.class);

private DefaultMQAdminExt defaultMQAdminExt;

private DefaultMQPullConsumer defaultMQPullConsumer;

private String namesrvAddr;

public RocketMQProduceOffsetFetcher(String namesrvAddr) {

this.defaultMQAdminExt = new DefaultMQAdminExt();

defaultMQAdminExt.setNamesrvAddr(namesrvAddr);

defaultMQAdminExt.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));

this.defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);

defaultMQPullConsumer.setInstanceName("admin-" + Long.toString(System.currentTimeMillis()));

defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);

this.namesrvAddr = namesrvAddr;

}

public String getNamesrvAddr() {

return namesrvAddr;

}

public void start() throws MQClientException {

defaultMQAdminExt.start();

defaultMQPullConsumer.start();

defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);

}

public void shutdown() {

defaultMQAdminExt.shutdown();

defaultMQPullConsumer.shutdown();

}

public ConsumeStats getConsumeStats(String group, String topic) throws Exception {

return defaultMQAdminExt.examineConsumeStats(group, topic);

}

public TopicStatsTable getProduceStats(String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

return defaultMQAdminExt.examineTopicStats(topic);

}

public PullResult queryMsgByOffset(MessageQueue mq, long offset) throws Exception {

return defaultMQPullConsumer.pull(mq, "*", offset, 1);

}

}

  • RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
  • 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
  • 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)

DefaultMQAdminExt

DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java

public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {

private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;

private String adminExtGroup = "admin_ext_group";

private String createTopicKey = MixAll.DEFAULT_TOPIC;

private long timeoutMillis = 5000;

//......

@Override

public ConsumeStats examineConsumeStats(String consumerGroup,

String topic) throws RemotingException, MQClientException,

InterruptedException, MQBrokerException {

return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);

}

@Override

public TopicStatsTable examineTopicStats(

String topic) throws RemotingException, MQClientException, InterruptedException,

MQBrokerException {

return defaultMQAdminExtImpl.examineTopicStats(topic);

}

//......

}

  • examineConsumeStats及examineTopicStats都委托给了defaultMQAdminExtImpl

DefaultMQAdminExtImpl

DDMQ/rocketmq/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {

private final Logger log = ClientLogger.getLog();

private final DefaultMQAdminExt defaultMQAdminExt;

private ServiceState serviceState = ServiceState.CREATE_JUST;

private MQClientInstance mqClientInstance;

private RPCHook rpcHook;

private long timeoutMillis = 20000;

private Random random = new Random();

//......

@Override

public ConsumeStats examineConsumeStats(String consumerGroup,

String topic) throws RemotingException, MQClientException,

InterruptedException, MQBrokerException {

String queryTopic = topic == null ? MixAll.getRetryTopic(consumerGroup) : topic;

TopicRouteData topicRouteData = this.examineTopicRouteInfo(queryTopic);

ConsumeStats result = new ConsumeStats();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {

String addr = bd.selectBrokerAddr();

if (addr != null) {

ConsumeStats consumeStats =

this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3);

result.getOffsetTable().putAll(consumeStats.getOffsetTable());

double value = result.getConsumeTps() + consumeStats.getConsumeTps();

result.setConsumeTps(value);

}

}

if (result.getOffsetTable().isEmpty()) {

throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,

"Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");

}

return result;

}

@Override

public TopicStatsTable examineTopicStats(

String topic) throws RemotingException, MQClientException, InterruptedException,

MQBrokerException {

TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);

TopicStatsTable topicStatsTable = new TopicStatsTable();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {

String addr = bd.selectBrokerAddr();

if (addr != null) {

TopicStatsTable tst = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis);

topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());

}

}

if (topicStatsTable.getOffsetTable().isEmpty()) {

throw new MQClientException("Not found the topic stats info", null);

}

return topicStatsTable;

}

//......

}

  • examineConsumeStats方法通过examineTopicRouteInfo(queryTopic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getConsumeStats(addr, consumerGroup, topic, timeoutMillis * 3)获取consumeStats;examineTopicStats方法也是先通过examineTopicRouteInfo(topic)方法获取topicRouteData,然后通过topicRouteData.getBrokerDatas()获取brokerAddr,之后通过mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic, timeoutMillis)获取topicStatsTable

小结

  • RocketMQProduceOffsetFetcher的构造器接收namesrvAddr,然后创建DefaultMQAdminExt及DefaultMQPullConsumer
  • 其start方法会执行defaultMQAdminExt.start()、defaultMQPullConsumer.start()及defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setConnectBrokerByUser(true);其shutdown执行defaultMQAdminExt.shutdown()及defaultMQPullConsumer.shutdown()
  • 其getConsumeStats方法执行的是defaultMQAdminExt.examineConsumeStats(group, topic);其getProduceStats方法执行的是defaultMQAdminExt.examineTopicStats(topic);其queryMsgByOffset方法执行的是defaultMQPullConsumer.pull(mq, "*", offset, 1)

doc

  • RocketMQProduceOffsetFetcher

以上是 聊聊carrera的RocketMQProduceOffsetFetcher 的全部内容, 来源链接: utcz.com/z/512570.html

回到顶部