聊聊rocketmq的enableMsgTrace

编程

本文主要研究一下rocketmq的enableMsgTrace

enableMsgTrace

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {

private final InternalLogger log = ClientLogger.getLog();

//......

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,

boolean enableMsgTrace, final String customizedTraceTopic) {

this.namespace = namespace;

this.producerGroup = producerGroup;

defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);

//if client open the message trace feature

if (enableMsgTrace) {

try {

AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);

dispatcher.setHostProducer(this.getDefaultMQProducerImpl());

traceDispatcher = dispatcher;

this.getDefaultMQProducerImpl().registerSendMessageHook(

new SendMessageTraceHookImpl(traceDispatcher));

} catch (Throwable e) {

log.error("system mqtrace hook init failed ,maybe can"t send msg trace data");

}

}

}

//......

}

  • DefaultMQProducer的构造器在enableMsgTrace为true时会创建AsyncTraceDispatcher,再创建SendMessageTraceHookImpl,然后执行getDefaultMQProducerImpl().registerSendMessageHook

SendMessageHook

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/hook/SendMessageHook.java

public interface SendMessageHook {

String hookName();

void sendMessageBefore(final SendMessageContext context);

void sendMessageAfter(final SendMessageContext context);

}

  • SendMessageHook定义了hookName、sendMessageBefore、sendMessageAfter方法

SendMessageTraceHookImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java

public class SendMessageTraceHookImpl implements SendMessageHook {

private TraceDispatcher localDispatcher;

public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {

this.localDispatcher = localDispatcher;

}

@Override

public String hookName() {

return "SendMessageTraceHook";

}

@Override

public void sendMessageBefore(SendMessageContext context) {

//if it is message trace data,then it doesn"t recorded

if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {

return;

}

//build the context content of TuxeTraceContext

TraceContext tuxeContext = new TraceContext();

tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));

context.setMqTraceContext(tuxeContext);

tuxeContext.setTraceType(TraceType.Pub);

tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));

//build the data bean object of message trace

TraceBean traceBean = new TraceBean();

traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));

traceBean.setTags(context.getMessage().getTags());

traceBean.setKeys(context.getMessage().getKeys());

traceBean.setStoreHost(context.getBrokerAddr());

traceBean.setBodyLength(context.getMessage().getBody().length);

traceBean.setMsgType(context.getMsgType());

tuxeContext.getTraceBeans().add(traceBean);

}

@Override

public void sendMessageAfter(SendMessageContext context) {

//if it is message trace data,then it doesn"t recorded

if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())

|| context.getMqTraceContext() == null) {

return;

}

if (context.getSendResult() == null) {

return;

}

if (context.getSendResult().getRegionId() == null

|| !context.getSendResult().isTraceOn()) {

// if switch is false,skip it

return;

}

TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();

TraceBean traceBean = tuxeContext.getTraceBeans().get(0);

int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());

tuxeContext.setCostTime(costTime);

if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {

tuxeContext.setSuccess(true);

} else {

tuxeContext.setSuccess(false);

}

tuxeContext.setRegionId(context.getSendResult().getRegionId());

traceBean.setMsgId(context.getSendResult().getMsgId());

traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());

traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);

localDispatcher.append(tuxeContext);

}

}

  • SendMessageTraceHookImpl实现了SendMessageHook接口,其构造器接收TraceDispatcher参数;其hookName为SendMessageTraceHook
  • 其sendMessageBefore方法会判断topic名是否是AsyncTraceDispatcher要trace的,不是则返回,是的话则构造TraceContext,添加TraceBean
  • 其sendMessageAfter方法会判断topic名是否是AsyncTraceDispatcher要trace的,不是则返回,是的话则判断context.getSendResult()是否为null,是则返回,不是接着判断context.getSendResult().getRegionId()为null或者context.getSendResult().isTraceOn()为false则返回;最后获取TraceContext,更新TraceBean,然后将TraceContext追加到TraceDispatcher

TraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java

public interface TraceDispatcher {

/**

* Initialize asynchronous transfer data module

*/

void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

/**

* Append the transfering data

* @param ctx data infomation

* @return

*/

boolean append(Object ctx);

/**

* Write flush action

*

* @throws IOException

*/

void flush() throws IOException;

/**

* Close the trace Hook

*/

void shutdown();

}

  • TraceDispatcher接口定义了start、append、flush、shutdown方法

AsyncTraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

public class AsyncTraceDispatcher implements TraceDispatcher {

private final static InternalLogger log = ClientLogger.getLog();

private final int queueSize;

private final int batchSize;

private final int maxMsgSize;

private final DefaultMQProducer traceProducer;

private final ThreadPoolExecutor traceExecutor;

// The last discard number of log

private AtomicLong discardCount;

private Thread worker;

private ArrayBlockingQueue<TraceContext> traceContextQueue;

private ArrayBlockingQueue<Runnable> appenderQueue;

private volatile Thread shutDownHook;

private volatile boolean stopped = false;

private DefaultMQProducerImpl hostProducer;

private DefaultMQPushConsumerImpl hostConsumer;

private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

private String dispatcherId = UUID.randomUUID().toString();

private String traceTopicName;

private AtomicBoolean isStarted = new AtomicBoolean(false);

private AccessChannel accessChannel = AccessChannel.LOCAL;

public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {

// queueSize is greater than or equal to the n power of 2 of value

this.queueSize = 2048;

this.batchSize = 100;

this.maxMsgSize = 128000;

this.discardCount = new AtomicLong(0L);

this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);

this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);

if (!UtilAll.isBlank(traceTopicName)) {

this.traceTopicName = traceTopicName;

} else {

this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;

}

this.traceExecutor = new ThreadPoolExecutor(//

10, //

20, //

1000 * 60, //

TimeUnit.MILLISECONDS, //

this.appenderQueue, //

new ThreadFactoryImpl("MQTraceSendThread_"));

traceProducer = getAndCreateTraceProducer(rpcHook);

}

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {

if (isStarted.compareAndSet(false, true)) {

traceProducer.setNamesrvAddr(nameSrvAddr);

traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);

traceProducer.start();

}

this.accessChannel = accessChannel;

this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);

this.worker.setDaemon(true);

this.worker.start();

this.registerShutDownHook();

}

private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {

DefaultMQProducer traceProducerInstance = this.traceProducer;

if (traceProducerInstance == null) {

traceProducerInstance = new DefaultMQProducer(rpcHook);

traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);

traceProducerInstance.setSendMsgTimeout(5000);

traceProducerInstance.setVipChannelEnabled(false);

// The max size of message is 128K

traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);

}

return traceProducerInstance;

}

@Override

public boolean append(final Object ctx) {

boolean result = traceContextQueue.offer((TraceContext) ctx);

if (!result) {

log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);

}

return result;

}

@Override

public void flush() throws IOException {

// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.

long end = System.currentTimeMillis() + 500;

while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {

try {

Thread.sleep(1);

} catch (InterruptedException e) {

break;

}

}

log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size());

}

@Override

public void shutdown() {

this.stopped = true;

this.traceExecutor.shutdown();

if (isStarted.get()) {

traceProducer.shutdown();

}

this.removeShutdownHook();

}

public void registerShutDownHook() {

if (shutDownHook == null) {

shutDownHook = new Thread(new Runnable() {

private volatile boolean hasShutdown = false;

@Override

public void run() {

synchronized (this) {

if (!this.hasShutdown) {

try {

flush();

} catch (IOException e) {

log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");

}

}

}

}

}, "ShutdownHookMQTrace");

Runtime.getRuntime().addShutdownHook(shutDownHook);

}

}

public void removeShutdownHook() {

if (shutDownHook != null) {

Runtime.getRuntime().removeShutdownHook(shutDownHook);

}

}

//......

}

  • AsyncTraceDispatcher的构造器创建了traceContextQueue及traceExecutor;append方法会往traceContextQueue添加TraceContext,如果添加不进去则递增discardCount,同时打印info日志
  • 其start方法创建并执行AsyncRunnable,同时执行了registerShutDownHook,该shutDownHook会在shutdown时执行flush;flush方法会不断循环等待traceContextQueue及appenderQueue队列大小为0,但整个等待时间不超过500ms
  • 其shutdown方法会执行traceExecutor.shutdown()、traceProducer.shutdown()、removeShutdownHook方法;removeShutdownHook方法会将shutDownHook从Runtime.getRuntime()中移除

AsyncRunnable

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

    class AsyncRunnable implements Runnable {

private boolean stopped;

@Override

public void run() {

while (!stopped) {

List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);

for (int i = 0; i < batchSize; i++) {

TraceContext context = null;

try {

//get trace data element from blocking Queue — traceContextQueue

context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);

} catch (InterruptedException e) {

}

if (context != null) {

contexts.add(context);

} else {

break;

}

}

if (contexts.size() > 0) {

AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);

traceExecutor.submit(request);

} else if (AsyncTraceDispatcher.this.stopped) {

this.stopped = true;

}

}

}

}

  • AsyncRunnable实现了Runnable接口,其run方法按batchSize循环从traceContextQueue拉取元素添加到contexts;接着创建AsyncAppenderRequest提交traceExecutor中

AsyncAppenderRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

    class AsyncAppenderRequest implements Runnable {

List<TraceContext> contextList;

public AsyncAppenderRequest(final List<TraceContext> contextList) {

if (contextList != null) {

this.contextList = contextList;

} else {

this.contextList = new ArrayList<TraceContext>(1);

}

}

@Override

public void run() {

sendTraceData(contextList);

}

public void sendTraceData(List<TraceContext> contextList) {

Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();

for (TraceContext context : contextList) {

if (context.getTraceBeans().isEmpty()) {

continue;

}

// Topic value corresponding to original message entity content

String topic = context.getTraceBeans().get(0).getTopic();

String regionId = context.getRegionId();

// Use original message entity"s topic as key

String key = topic;

if (!StringUtils.isBlank(regionId)) {

key = key + TraceConstants.CONTENT_SPLITOR + regionId;

}

List<TraceTransferBean> transBeanList = transBeanMap.get(key);

if (transBeanList == null) {

transBeanList = new ArrayList<TraceTransferBean>();

transBeanMap.put(key, transBeanList);

}

TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);

transBeanList.add(traceData);

}

for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {

String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));

String dataTopic = entry.getKey();

String regionId = null;

if (key.length > 1) {

dataTopic = key[0];

regionId = key[1];

}

flushData(entry.getValue(), dataTopic, regionId);

}

}

/**

* Batch sending data actually

*/

private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {

if (transBeanList.size() == 0) {

return;

}

// Temporary buffer

StringBuilder buffer = new StringBuilder(1024);

int count = 0;

Set<String> keySet = new HashSet<String>();

for (TraceTransferBean bean : transBeanList) {

// Keyset of message trace includes msgId of or original message

keySet.addAll(bean.getTransKey());

buffer.append(bean.getTransData());

count++;

// Ensure that the size of the package should not exceed the upper limit.

if (buffer.length() >= traceProducer.getMaxMessageSize()) {

sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);

// Clear temporary buffer after finishing

buffer.delete(0, buffer.length());

keySet.clear();

count = 0;

}

}

if (count > 0) {

sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);

}

transBeanList.clear();

}

/**

* Send message trace data

*

* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)

* @param data the message trace data in this batch

*/

private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {

String traceTopic = traceTopicName;

if (AccessChannel.CLOUD == accessChannel) {

traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;

}

final Message message = new Message(traceTopic, data.getBytes());

// Keyset of message trace includes msgId of or original message

message.setKeys(keySet);

try {

Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);

SendCallback callback = new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

}

@Override

public void onException(Throwable e) {

log.info("send trace data ,the traceData is " + data);

}

};

if (traceBrokerSet.isEmpty()) {

// No cross set

traceProducer.send(message, callback, 5000);

} else {

traceProducer.send(message, new MessageQueueSelector() {

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

Set<String> brokerSet = (Set<String>) arg;

List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();

for (MessageQueue queue : mqs) {

if (brokerSet.contains(queue.getBrokerName())) {

filterMqs.add(queue);

}

}

int index = sendWhichQueue.getAndIncrement();

int pos = Math.abs(index) % filterMqs.size();

if (pos < 0) {

pos = 0;

}

return filterMqs.get(pos);

}

}, traceBrokerSet, callback);

}

} catch (Exception e) {

log.info("send trace data,the traceData is" + data);

}

}

private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {

Set<String> brokerSet = new HashSet<String>();

TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);

if (null == topicPublishInfo || !topicPublishInfo.ok()) {

producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());

producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);

topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);

}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {

for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {

brokerSet.add(queue.getBrokerName());

}

}

return brokerSet;

}

}

  • AsyncAppenderRequest实现了Runnable接口,其run方法执行sendTraceData;该方法遍历contextList,将TraceTransferBean按topic归类到transBeanMap;之后遍历transBeanMap,执行flushData;
  • flushData方法遍历transBeanList,将transData添加到StringBuilder,如果buffer大小大于等于traceProducer.getMaxMessageSize()则执行sendTraceDataByMQ,并重置count,遍历完之后再次判断count是否大于0,是则再次执行sendTraceDataByMQ方法
  • sendTraceDataByMQ方法首先通过tryGetMessageQueueBrokerSet获取traceBrokerSet,如果traceBrokerSet为空则执行traceProducer.send(message, callback, 5000),否则创建MessageQueueSelector再执行send方法

小结

DefaultMQProducer的构造器在enableMsgTrace为true时会创建AsyncTraceDispatcher,再创建SendMessageTraceHookImpl,然后执行getDefaultMQProducerImpl().registerSendMessageHook

doc

  • DefaultMQProducer

以上是 聊聊rocketmq的enableMsgTrace 的全部内容, 来源链接: utcz.com/z/510668.html

回到顶部