聊聊rocketmq的AccessChannel

编程

本文主要研究一下rocketmq的AccessChannel

AccessChannel

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.java

public enum AccessChannel {

/**

* Means connect to private IDC cluster.

*/

LOCAL,

/**

* Means connect to Cloud service.

*/

CLOUD,

}

  • AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD

TraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java

public interface TraceDispatcher {

/**

* Initialize asynchronous transfer data module

*/

void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

/**

* Append the transfering data

* @param ctx data infomation

* @return

*/

boolean append(Object ctx);

/**

* Write flush action

*

* @throws IOException

*/

void flush() throws IOException;

/**

* Close the trace Hook

*/

void shutdown();

}

  • TraceDispatcher的start方法会接收AccessChannel类型的参数

AsyncTraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

public class AsyncTraceDispatcher implements TraceDispatcher {

private final static InternalLogger log = ClientLogger.getLog();

private final int queueSize;

private final int batchSize;

private final int maxMsgSize;

private final DefaultMQProducer traceProducer;

private final ThreadPoolExecutor traceExecutor;

// The last discard number of log

private AtomicLong discardCount;

private Thread worker;

private ArrayBlockingQueue<TraceContext> traceContextQueue;

private ArrayBlockingQueue<Runnable> appenderQueue;

private volatile Thread shutDownHook;

private volatile boolean stopped = false;

private DefaultMQProducerImpl hostProducer;

private DefaultMQPushConsumerImpl hostConsumer;

private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

private String dispatcherId = UUID.randomUUID().toString();

private String traceTopicName;

private AtomicBoolean isStarted = new AtomicBoolean(false);

private AccessChannel accessChannel = AccessChannel.LOCAL;

//......

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {

if (isStarted.compareAndSet(false, true)) {

traceProducer.setNamesrvAddr(nameSrvAddr);

traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);

traceProducer.start();

}

this.accessChannel = accessChannel;

this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);

this.worker.setDaemon(true);

this.worker.start();

this.registerShutDownHook();

}

//......

class AsyncAppenderRequest implements Runnable {

List<TraceContext> contextList;

public AsyncAppenderRequest(final List<TraceContext> contextList) {

if (contextList != null) {

this.contextList = contextList;

} else {

this.contextList = new ArrayList<TraceContext>(1);

}

}

private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {

String traceTopic = traceTopicName;

if (AccessChannel.CLOUD == accessChannel) {

traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;

}

final Message message = new Message(traceTopic, data.getBytes());

// Keyset of message trace includes msgId of or original message

message.setKeys(keySet);

//......

}

//......

}

//......

}

  • AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic

小结

AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD;TraceDispatcher的start方法会接收AccessChannel类型的参数;AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic

doc

  • AccessChannel

以上是 聊聊rocketmq的AccessChannel 的全部内容, 来源链接: utcz.com/z/510701.html

回到顶部