聊聊rocketmq的registerConsumer与unregisterConsumer

编程

本文主要研究一下rocketmq的registerConsumer与unregisterConsumer

MQClientInstance

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

private final static long LOCK_TIMEOUT_MILLIS = 3000;

private final InternalLogger log = ClientLogger.getLog();

private final ClientConfig clientConfig;

private final int instanceIndex;

private final String clientId;

private final long bootTimestamp = System.currentTimeMillis();

private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();

//......

public boolean registerConsumer(final String group, final MQConsumerInner consumer) {

if (null == group || null == consumer) {

return false;

}

MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);

if (prev != null) {

log.warn("the consumer group[" + group + "] exist already.");

return false;

}

return true;

}

public void unregisterConsumer(final String group) {

this.consumerTable.remove(group);

this.unregisterClientWithLock(null, group);

}

private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {

try {

if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {

try {

this.unregisterClient(producerGroup, consumerGroup);

} catch (Exception e) {

log.error("unregisterClient exception", e);

} finally {

this.lockHeartbeat.unlock();

}

} else {

log.warn("lock heartBeat, but failed.");

}

} catch (InterruptedException e) {

log.warn("unregisterClientWithLock exception", e);

}

}

private void unregisterClient(final String producerGroup, final String consumerGroup) {

Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, HashMap<Long, String>> entry = it.next();

String brokerName = entry.getKey();

HashMap<Long, String> oneTable = entry.getValue();

if (oneTable != null) {

for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {

String addr = entry1.getValue();

if (addr != null) {

try {

this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);

log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);

} catch (RemotingException e) {

log.error("unregister client exception from broker: " + addr, e);

} catch (InterruptedException e) {

log.error("unregister client exception from broker: " + addr, e);

} catch (MQBrokerException e) {

log.error("unregister client exception from broker: " + addr, e);

}

}

}

}

}

}

//......

}

  • MQClientInstance定义了consumerTable,其registerConsumer方法会执行consumerTable.putIfAbsent(group, consumer),如果返回值不为null,则返回false;其unregisterConsumer方法会执行consumerTable.remove(group)以及unregisterClientWithLock(null, group);unregisterClientWithLock主要是执行unregisterClient,而后者会执行mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000)

MQClientAPIImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

//......

public void unregisterClient(

final String addr,

final String clientID,

final String producerGroup,

final String consumerGroup,

final long timeoutMillis

) throws RemotingException, MQBrokerException, InterruptedException {

final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();

requestHeader.setClientID(clientID);

requestHeader.setProducerGroup(producerGroup);

requestHeader.setConsumerGroup(consumerGroup);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

assert response != null;

switch (response.getCode()) {

case ResponseCode.SUCCESS: {

return;

}

default:

break;

}

throw new MQBrokerException(response.getCode(), response.getRemark());

}

//......

}

  • MQClientAPIImpl的unregisterClient方法主要是构造RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader),然后通过remotingClient.invokeSync(addr, request, timeoutMillis)向broker发送请求

DefaultMQPushConsumerImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

//......

public synchronized void start() throws MQClientException {

switch (this.serviceState) {

case CREATE_JUST:

log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),

this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

this.copySubscription();

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {

this.defaultMQPushConsumer.changeInstanceNameToPID();

}

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());

this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

this.pullAPIWrapper = new PullAPIWrapper(

mQClientFactory,

this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());

this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

if (this.defaultMQPushConsumer.getOffsetStore() != null) {

this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();

} else {

switch (this.defaultMQPushConsumer.getMessageModel()) {

case BROADCASTING:

this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

break;

case CLUSTERING:

this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

break;

default:

break;

}

this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

}

this.offsetStore.load();

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

this.consumeOrderly = true;

this.consumeMessageService =

new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {

this.consumeOrderly = false;

this.consumeMessageService =

new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

}

this.consumeMessageService.start();

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

if (!registerOK) {

this.serviceState = ServiceState.CREATE_JUST;

this.consumeMessageService.shutdown();

throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()

+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

null);

}

mQClientFactory.start();

log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

case START_FAILED:

case SHUTDOWN_ALREADY:

throw new MQClientException("The PushConsumer service state not OK, maybe started once, "

+ this.serviceState

+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

null);

default:

break;

}

this.updateTopicSubscribeInfoWhenSubscriptionChanged();

this.mQClientFactory.checkClientInBroker();

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

this.mQClientFactory.rebalanceImmediately();

}

//......

}

  • DefaultMQPushConsumerImpl的start方法当执行mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this)返回false的时候,会抛出MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null)

小结

MQClientInstance定义了consumerTable,其registerConsumer方法会执行consumerTable.putIfAbsent(group, consumer),如果返回值不为null,则返回false;其unregisterConsumer方法会执行consumerTable.remove(group)以及unregisterClientWithLock(null, group);unregisterClientWithLock主要是执行unregisterClient,而后者会执行mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000)

doc

  • MQClientInstance

以上是 聊聊rocketmq的registerConsumer与unregisterConsumer 的全部内容, 来源链接: utcz.com/z/511357.html

回到顶部