聊聊rocketmq的registerProducer与unregisterProducer

编程

本文主要研究一下rocketmq的registerProducer与unregisterProducer

MQClientInstance

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

private final static long LOCK_TIMEOUT_MILLIS = 3000;

private final InternalLogger log = ClientLogger.getLog();

private final ClientConfig clientConfig;

private final int instanceIndex;

private final String clientId;

private final long bootTimestamp = System.currentTimeMillis();

private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();

private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();

//......

public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {

if (null == group || null == producer) {

return false;

}

MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);

if (prev != null) {

log.warn("the producer group[{}] exist already.", group);

return false;

}

return true;

}

public void unregisterProducer(final String group) {

this.producerTable.remove(group);

this.unregisterClientWithLock(group, null);

}

private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {

try {

if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {

try {

this.unregisterClient(producerGroup, consumerGroup);

} catch (Exception e) {

log.error("unregisterClient exception", e);

} finally {

this.lockHeartbeat.unlock();

}

} else {

log.warn("lock heartBeat, but failed.");

}

} catch (InterruptedException e) {

log.warn("unregisterClientWithLock exception", e);

}

}

private void unregisterClient(final String producerGroup, final String consumerGroup) {

Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, HashMap<Long, String>> entry = it.next();

String brokerName = entry.getKey();

HashMap<Long, String> oneTable = entry.getValue();

if (oneTable != null) {

for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {

String addr = entry1.getValue();

if (addr != null) {

try {

this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);

log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);

} catch (RemotingException e) {

log.error("unregister client exception from broker: " + addr, e);

} catch (InterruptedException e) {

log.error("unregister client exception from broker: " + addr, e);

} catch (MQBrokerException e) {

log.error("unregister client exception from broker: " + addr, e);

}

}

}

}

}

}

//......

}

  • MQClientInstance定义了producerTable,其registerProducer方法会执行producerTable.putIfAbsent(group, producer),如果返回值不为null则返回false;其unregisterProducer方法会执行producerTable.remove(group)以及unregisterClientWithLock,而后者主要执行的是unregisterClient,它最后执行的是mQClientAPIImpl.unregisterClient

DefaultMQProducerImpl.start

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {

//......

public void start() throws MQClientException {

this.start(true);

}

public void start(final boolean startFactory) throws MQClientException {

switch (this.serviceState) {

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {

this.defaultMQProducer.changeInstanceNameToPID();

}

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

if (!registerOK) {

this.serviceState = ServiceState.CREATE_JUST;

throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()

+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

null);

}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {

mQClientFactory.start();

}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),

this.defaultMQProducer.isSendMessageWithVIPChannel());

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

case START_FAILED:

case SHUTDOWN_ALREADY:

throw new MQClientException("The producer service state not OK, maybe started once, "

+ this.serviceState

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

null);

default:

break;

}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

}

//......

}

  • DefaultMQProducerImpl的start方法在serviceState为CREATE_JUST时会执行mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this),如果返回false则抛出MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null)

DefaultMQProducerImpl.shutdown

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {

//......

public void shutdown() {

this.shutdown(true);

}

public void shutdown(final boolean shutdownFactory) {

switch (this.serviceState) {

case CREATE_JUST:

break;

case RUNNING:

this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());

this.defaultAsyncSenderExecutor.shutdown();

if (shutdownFactory) {

this.mQClientFactory.shutdown();

}

log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());

this.serviceState = ServiceState.SHUTDOWN_ALREADY;

break;

case SHUTDOWN_ALREADY:

break;

default:

break;

}

}

//......

}

  • DefaultMQProducerImpl的shutdown方法在serviceState为RUNNING的时候会执行mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup())

小结

MQClientInstance定义了producerTable,其registerProducer方法会执行producerTable.putIfAbsent(group, producer),如果返回值不为null则返回false;其unregisterProducer方法会执行producerTable.remove(group)以及unregisterClientWithLock,而后者主要执行的是unregisterClient,它最后执行的是mQClientAPIImpl.unregisterClient

doc

  • MQClientInstance

以上是 聊聊rocketmq的registerProducer与unregisterProducer 的全部内容, 来源链接: utcz.com/z/511385.html

回到顶部