聊聊rocketmq的getOrCreateMQClientInstance

编程

本文主要研究一下rocketmq的getOrCreateMQClientInstance

getOrCreateMQClientInstance

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientManager.java

public class MQClientManager {

private final static InternalLogger log = ClientLogger.getLog();

private static MQClientManager instance = new MQClientManager();

private AtomicInteger factoryIndexGenerator = new AtomicInteger();

private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =

new ConcurrentHashMap<String, MQClientInstance>();

private MQClientManager() {

}

public static MQClientManager getInstance() {

return instance;

}

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) {

return getOrCreateMQClientInstance(clientConfig, null);

}

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {

String clientId = clientConfig.buildMQClientId();

MQClientInstance instance = this.factoryTable.get(clientId);

if (null == instance) {

instance =

new MQClientInstance(clientConfig.cloneClientConfig(),

this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);

MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);

if (prev != null) {

instance = prev;

log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);

} else {

log.info("Created new MQClientInstance for clientId:[{}]", clientId);

}

}

return instance;

}

public void removeClientFactory(final String clientId) {

this.factoryTable.remove(clientId);

}

}

  • MQClientManager提供了getOrCreateMQClientInstance方法用于根据clientConfig及rpcHook来创建MQClientInstance;它使用factoryTable来存储clientId与MQClientInstance的映射关系,只要clientId是一样的,获取的就是相同的MQClientInstance;而clientId则由clientConfig.buildMQClientId()计算出来

ClientConfig

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/ClientConfig.java

public class ClientConfig {

public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";

private String namesrvAddr = NameServerAddressUtils.getNameServerAddresses();

private String clientIP = RemotingUtil.getLocalAddress();

private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");

private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();

protected String namespace;

protected AccessChannel accessChannel = AccessChannel.LOCAL;

/**

* Pulling topic information interval from the named server

*/

private int pollNameServerInterval = 1000 * 30;

/**

* Heartbeat interval in microseconds with message broker

*/

private int heartbeatBrokerInterval = 1000 * 30;

/**

* Offset persistent interval for consumer

*/

private int persistConsumerOffsetInterval = 1000 * 5;

private long pullTimeDelayMillsWhenException = 1000;

private boolean unitMode = false;

private String unitName;

private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));

private boolean useTLS = TlsSystemConfig.tlsEnable;

private LanguageCode language = LanguageCode.JAVA;

public String buildMQClientId() {

StringBuilder sb = new StringBuilder();

sb.append(this.getClientIP());

sb.append("@");

sb.append(this.getInstanceName());

if (!UtilAll.isBlank(this.unitName)) {

sb.append("@");

sb.append(this.unitName);

}

return sb.toString();

}

public String getClientIP() {

return clientIP;

}

public String getInstanceName() {

return instanceName;

}

public void changeInstanceNameToPID() {

if (this.instanceName.equals("DEFAULT")) {

this.instanceName = String.valueOf(UtilAll.getPid());

}

}

//......

}

  • ClientConfig的buildMQClientId会根据clientIP、instanceName、unitName来构建;clientIP默认值为RemotingUtil.getLocalAddress();instanceName默认值为System.getProperty("rocketmq.client.name", "DEFAULT");ClientConfig还提供了一个changeInstanceNameToPID方法,在instanceName值为默认值的时候,将其改为UtilAll.getPid();unitName默认为空

DefaultMQProducerImpl.start

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {

//......

public void start(final boolean startFactory) throws MQClientException {

switch (this.serviceState) {

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {

this.defaultMQProducer.changeInstanceNameToPID();

}

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

if (!registerOK) {

this.serviceState = ServiceState.CREATE_JUST;

throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()

+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

null);

}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {

mQClientFactory.start();

}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),

this.defaultMQProducer.isSendMessageWithVIPChannel());

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

case START_FAILED:

case SHUTDOWN_ALREADY:

throw new MQClientException("The producer service state not OK, maybe started once, "

+ this.serviceState

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

null);

default:

break;

}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override

public void run() {

try {

RequestFutureTable.scanExpiredRequest();

} catch (Throwable e) {

log.error("scan RequestFutureTable exception", e);

}

}

}, 1000 * 3, 1000);

}

//......

}

  • DefaultMQProducerImpl的start方法在defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)为false的时候,会执行defaultMQProducer.changeInstanceNameToPID()方法

DefaultMQPushConsumerImpl.start

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

public synchronized void start() throws MQClientException {

switch (this.serviceState) {

case CREATE_JUST:

log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),

this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

this.copySubscription();

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {

this.defaultMQPushConsumer.changeInstanceNameToPID();

}

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());

this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(

mQClientFactory,

this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());

this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

if (this.defaultMQPushConsumer.getOffsetStore() != null) {

this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();

} else {

switch (this.defaultMQPushConsumer.getMessageModel()) {

case BROADCASTING:

this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

break;

case CLUSTERING:

this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

break;

default:

break;

}

this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

}

this.offsetStore.load();

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

this.consumeOrderly = true;

this.consumeMessageService =

new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {

this.consumeOrderly = false;

this.consumeMessageService =

new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

}

this.consumeMessageService.start();

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

if (!registerOK) {

this.serviceState = ServiceState.CREATE_JUST;

this.consumeMessageService.shutdown();

throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()

+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

null);

}

mQClientFactory.start();

log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

case START_FAILED:

case SHUTDOWN_ALREADY:

throw new MQClientException("The PushConsumer service state not OK, maybe started once, "

+ this.serviceState

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

null);

default:

break;

}

this.updateTopicSubscribeInfoWhenSubscriptionChanged();

this.mQClientFactory.checkClientInBroker();

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

this.mQClientFactory.rebalanceImmediately();

}

//......

}

  • DefaultMQPushConsumerImpl的start方法在defaultMQPushConsumer.getMessageModel()为MessageModel.CLUSTERING时,会执行defaultMQPushConsumer.changeInstanceNameToPID()方法

RocketMQUtil.getInstanceName

rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/RocketMQUtil.java

public class RocketMQUtil {

//......

public static String getInstanceName(RPCHook rpcHook, String identify) {

String separator = "|";

StringBuilder instanceName = new StringBuilder();

SessionCredentials sessionCredentials = ((AclClientRPCHook)rpcHook).getSessionCredentials();

instanceName.append(sessionCredentials.getAccessKey())

.append(separator).append(sessionCredentials.getSecretKey())

.append(separator).append(identify)

.append(separator).append(UtilAll.getPid());

return instanceName.toString();

}

//......

}

  • RocketMQUtil提供了getInstanceName方法,它根据rpcHook的信息、identify以及UtilAll.getPid()来构建

initRocketMQPushConsumer

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,

RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {

private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);

private ApplicationContext applicationContext;

//......

private void initRocketMQPushConsumer() throws MQClientException {

Assert.notNull(rocketMQListener, "Property "rocketMQListener" is required");

Assert.notNull(consumerGroup, "Property "consumerGroup" is required");

Assert.notNull(nameServer, "Property "nameServer" is required");

Assert.notNull(topic, "Property "topic" is required");

RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),

this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());

boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();

if (Objects.nonNull(rpcHook)) {

consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),

enableMsgTrace, this.applicationContext.getEnvironment().

resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));

consumer.setVipChannelEnabled(false);

consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));

} else {

log.debug("Access-key or secret-key not configure in " + this + ".");

consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,

this.applicationContext.getEnvironment().

resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));

}

String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());

if (customizedNameServer != null) {

consumer.setNamesrvAddr(customizedNameServer);

} else {

consumer.setNamesrvAddr(nameServer);

}

if (accessChannel != null) {

consumer.setAccessChannel(accessChannel);

}

consumer.setConsumeThreadMax(consumeThreadMax);

if (consumeThreadMax < consumer.getConsumeThreadMin()) {

consumer.setConsumeThreadMin(consumeThreadMax);

}

consumer.setConsumeTimeout(consumeTimeout);

switch (messageModel) {

case BROADCASTING:

consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);

break;

case CLUSTERING:

consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);

break;

default:

throw new IllegalArgumentException("Property "messageModel" was wrong.");

}

switch (selectorType) {

case TAG:

consumer.subscribe(topic, selectorExpression);

break;

case SQL92:

consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));

break;

default:

throw new IllegalArgumentException("Property "selectorType" was wrong.");

}

switch (consumeMode) {

case ORDERLY:

consumer.setMessageListener(new DefaultMessageListenerOrderly());

break;

case CONCURRENTLY:

consumer.setMessageListener(new DefaultMessageListenerConcurrently());

break;

default:

throw new IllegalArgumentException("Property "consumeMode" was wrong.");

}

if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {

((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);

}

}

//......

}

  • DefaultRocketMQListenerContainer的initRocketMQPushConsumer方法在rpcHook不为null的时候,会使用RocketMQUtil.getInstanceName(rpcHook, consumerGroup)来设置consumer的instanceName

createTransactionMQProducer

rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {

private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);

private DefaultMQProducer producer;

//......

private TransactionMQProducer createTransactionMQProducer(String name,

RocketMQLocalTransactionListener transactionListener,

ExecutorService executorService, RPCHook rpcHook) {

Assert.notNull(producer, "Property "producer" is required");

Assert.notNull(transactionListener, "Parameter "transactionListener" is required");

TransactionMQProducer txProducer;

if (Objects.nonNull(rpcHook)) {

txProducer = new TransactionMQProducer(name, rpcHook);

txProducer.setVipChannelEnabled(false);

txProducer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, name));

} else {

txProducer = new TransactionMQProducer(name);

}

txProducer.setTransactionListener(RocketMQUtil.convert(transactionListener));

txProducer.setNamespace(producer.getNamespace());

txProducer.setNamesrvAddr(producer.getNamesrvAddr());

if (executorService != null) {

txProducer.setExecutorService(executorService);

}

txProducer.setSendMsgTimeout(producer.getSendMsgTimeout());

txProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed());

txProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed());

txProducer.setMaxMessageSize(producer.getMaxMessageSize());

txProducer.setCompressMsgBodyOverHowmuch(producer.getCompressMsgBodyOverHowmuch());

txProducer.setRetryAnotherBrokerWhenNotStoreOK(producer.isRetryAnotherBrokerWhenNotStoreOK());

return txProducer;

}

//......

}

  • RocketMQTemplate的createTransactionMQProducer方法在rpcHook不为null的时候会使用RocketMQUtil.getInstanceName(rpcHook, name)来设置txProducer的instanceName

小结

  • MQClientManager提供了getOrCreateMQClientInstance方法用于根据clientConfig及rpcHook来创建MQClientInstance;它使用factoryTable来存储clientId与MQClientInstance的映射关系,只要clientId是一样的,获取的就是相同的MQClientInstance;而clientId则由clientConfig.buildMQClientId()计算出来
  • ClientConfig的buildMQClientId会根据clientIP、instanceName、unitName来构建;clientIP默认值为RemotingUtil.getLocalAddress();instanceName默认值为System.getProperty("rocketmq.client.name", "DEFAULT");ClientConfig还提供了一个changeInstanceNameToPID方法,在instanceName值为默认值的时候,将其改为UtilAll.getPid();unitName默认为空
  • RocketMQUtil提供了getInstanceName方法,它根据rpcHook的信息、identify以及UtilAll.getPid()来构建;DefaultRocketMQListenerContainer的initRocketMQPushConsumer方法在rpcHook不为null的时候,会使用RocketMQUtil.getInstanceName(rpcHook, consumerGroup)来设置consumer的instanceName;RocketMQTemplate的createTransactionMQProducer方法在rpcHook不为null的时候会使用RocketMQUtil.getInstanceName(rpcHook, name)来设置txProducer的instanceName

doc

  • MQClientManager

以上是 聊聊rocketmq的getOrCreateMQClientInstance 的全部内容, 来源链接: utcz.com/z/511622.html

回到顶部