聊聊rocketmq的retryAnotherBrokerWhenNotStoreOK

编程

本文主要研究一下rocketmq的retryAnotherBrokerWhenNotStoreOK

DefaultMQProducer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {

private final InternalLogger log = ClientLogger.getLog();

//......

/**

* Indicate whether to retry another broker on sending failure internally.

*/

private boolean retryAnotherBrokerWhenNotStoreOK = false;

public boolean isRetryAnotherBrokerWhenNotStoreOK() {

return retryAnotherBrokerWhenNotStoreOK;

}

public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {

this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;

}

//......

}

  • DefaultMQProducer有个retryAnotherBrokerWhenNotStoreOK属性,默认为false

DefaultMQProducerImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {

private final InternalLogger log = ClientLogger.getLog();

private final Random random = new Random();

private final DefaultMQProducer defaultMQProducer;

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =

new ConcurrentHashMap<String, TopicPublishInfo>();

private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();

private final RPCHook rpcHook;

protected BlockingQueue<Runnable> checkRequestQueue;

protected ExecutorService checkExecutor;

private ServiceState serviceState = ServiceState.CREATE_JUST;

private MQClientInstance mQClientFactory;

private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();

private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));

private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;

private final ExecutorService defaultAsyncSenderExecutor;

private ExecutorService asyncSenderExecutor;

//......

private SendResult sendDefaultImpl(

Message msg,

final CommunicationMode communicationMode,

final SendCallback sendCallback,

final long timeout

) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

this.makeSureStateOK();

Validators.checkMessage(msg, this.defaultMQProducer);

final long invokeID = random.nextLong();

long beginTimestampFirst = System.currentTimeMillis();

long beginTimestampPrev = beginTimestampFirst;

long endTimestamp = beginTimestampFirst;

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

if (topicPublishInfo != null && topicPublishInfo.ok()) {

boolean callTimeout = false;

MessageQueue mq = null;

Exception exception = null;

SendResult sendResult = null;

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

int times = 0;

String[] brokersSent = new String[timesTotal];

for (; times < timesTotal; times++) {

String lastBrokerName = null == mq ? null : mq.getBrokerName();

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

if (mqSelected != null) {

mq = mqSelected;

brokersSent[times] = mq.getBrokerName();

try {

beginTimestampPrev = System.currentTimeMillis();

if (times > 0) {

//Reset topic with namespace during resend.

msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));

}

long costTime = beginTimestampPrev - beginTimestampFirst;

if (timeout < costTime) {

callTimeout = true;

break;

}

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

switch (communicationMode) {

case ASYNC:

return null;

case ONEWAY:

return null;

case SYNC:

if (sendResult.getSendStatus() != SendStatus.SEND_OK) {

if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {

continue;

}

}

return sendResult;

default:

break;

}

} catch (RemotingException e) {

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

log.warn(msg.toString());

exception = e;

continue;

} catch (MQClientException e) {

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

log.warn(msg.toString());

exception = e;

continue;

} catch (MQBrokerException e) {

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

log.warn(msg.toString());

exception = e;

switch (e.getResponseCode()) {

case ResponseCode.TOPIC_NOT_EXIST:

case ResponseCode.SERVICE_NOT_AVAILABLE:

case ResponseCode.SYSTEM_ERROR:

case ResponseCode.NO_PERMISSION:

case ResponseCode.NO_BUYER_ID:

case ResponseCode.NOT_IN_CURRENT_UNIT:

continue;

default:

if (sendResult != null) {

return sendResult;

}

throw e;

}

} catch (InterruptedException e) {

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

log.warn(msg.toString());

log.warn("sendKernelImpl exception", e);

log.warn(msg.toString());

throw e;

}

} else {

break;

}

}

if (sendResult != null) {

return sendResult;

}

String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",

times,

System.currentTimeMillis() - beginTimestampFirst,

msg.getTopic(),

Arrays.toString(brokersSent));

info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

MQClientException mqClientException = new MQClientException(info, exception);

if (callTimeout) {

throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");

}

if (exception instanceof MQBrokerException) {

mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());

} else if (exception instanceof RemotingConnectException) {

mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);

} else if (exception instanceof RemotingTimeoutException) {

mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);

} else if (exception instanceof MQClientException) {

mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);

}

throw mqClientException;

}

List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();

if (null == nsList || nsList.isEmpty()) {

throw new MQClientException(

"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);

}

throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),

null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);

}

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);

}

//......

}

  • DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法

MQFaultStrategy

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.java

public class MQFaultStrategy {

private final static InternalLogger log = ClientLogger.getLog();

private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

private boolean sendLatencyFaultEnable = false;

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

public long[] getNotAvailableDuration() {

return notAvailableDuration;

}

public void setNotAvailableDuration(final long[] notAvailableDuration) {

this.notAvailableDuration = notAvailableDuration;

}

public long[] getLatencyMax() {

return latencyMax;

}

public void setLatencyMax(final long[] latencyMax) {

this.latencyMax = latencyMax;

}

public boolean isSendLatencyFaultEnable() {

return sendLatencyFaultEnable;

}

public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {

this.sendLatencyFaultEnable = sendLatencyFaultEnable;

}

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

if (this.sendLatencyFaultEnable) {

try {

int index = tpInfo.getSendWhichQueue().getAndIncrement();

for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {

int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();

if (pos < 0)

pos = 0;

MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {

if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

return mq;

}

}

final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();

int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);

if (writeQueueNums > 0) {

final MessageQueue mq = tpInfo.selectOneMessageQueue();

if (notBestBroker != null) {

mq.setBrokerName(notBestBroker);

mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);

}

return mq;

} else {

latencyFaultTolerance.remove(notBestBroker);

}

} catch (Exception e) {

log.error("Error occurred when selecting message queue", e);

}

return tpInfo.selectOneMessageQueue();

}

return tpInfo.selectOneMessageQueue(lastBrokerName);

}

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {

if (this.sendLatencyFaultEnable) {

long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);

this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);

}

}

private long computeNotAvailableDuration(final long currentLatency) {

for (int i = latencyMax.length - 1; i >= 0; i--) {

if (currentLatency >= latencyMax[i])

return this.notAvailableDuration[i];

}

return 0;

}

}

  • MQFaultStrategy的selectOneMessageQueue方法首先判断是否开启sendLatencyFaultEnable,默认为false,直接走tpInfo.selectOneMessageQueue(lastBrokerName)

TopicPublishInfo

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java

public class TopicPublishInfo {

private boolean orderTopic = false;

private boolean haveTopicRouterInfo = false;

private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();

private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

private TopicRouteData topicRouteData;

//......

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {

if (lastBrokerName == null) {

return selectOneMessageQueue();

} else {

int index = this.sendWhichQueue.getAndIncrement();

for (int i = 0; i < this.messageQueueList.size(); i++) {

int pos = Math.abs(index++) % this.messageQueueList.size();

if (pos < 0)

pos = 0;

MessageQueue mq = this.messageQueueList.get(pos);

if (!mq.getBrokerName().equals(lastBrokerName)) {

return mq;

}

}

return selectOneMessageQueue();

}

}

public MessageQueue selectOneMessageQueue() {

int index = this.sendWhichQueue.getAndIncrement();

int pos = Math.abs(index) % this.messageQueueList.size();

if (pos < 0)

pos = 0;

return this.messageQueueList.get(pos);

}

//......

}

  • TopicPublishInfo的selectOneMessageQueue在lastBrokerName为null的时候执行selectOneMessageQueue,采取的轮询的方式选择MessageQueue;lastBrokerName不为null的时候,最多循环messageQueueList.size()次,选出一个brokerName不为lastBrokerName的MessageQueue;如果都没有选到最后通过无参的selectOneMessageQueue来选择

小结

DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法

doc

  • DefaultMQProducerImpl

以上是 聊聊rocketmq的retryAnotherBrokerWhenNotStoreOK 的全部内容, 来源链接: utcz.com/z/510450.html

回到顶部