聊聊rocketmq的retryAnotherBrokerWhenNotStoreOK
序
本文主要研究一下rocketmq的retryAnotherBrokerWhenNotStoreOK
DefaultMQProducer
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java
public class DefaultMQProducer extends ClientConfig implements MQProducer { private final InternalLogger log = ClientLogger.getLog();
//......
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private boolean retryAnotherBrokerWhenNotStoreOK = false;
public boolean isRetryAnotherBrokerWhenNotStoreOK() {
return retryAnotherBrokerWhenNotStoreOK;
}
public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
}
//......
}
- DefaultMQProducer有个retryAnotherBrokerWhenNotStoreOK属性,默认为false
DefaultMQProducerImpl
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final RPCHook rpcHook;
protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor;
private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor;
//......
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
//......
}
- DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法
MQFaultStrategy
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.java
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
public long[] getNotAvailableDuration() {
return notAvailableDuration;
}
public void setNotAvailableDuration(final long[] notAvailableDuration) {
this.notAvailableDuration = notAvailableDuration;
}
public long[] getLatencyMax() {
return latencyMax;
}
public void setLatencyMax(final long[] latencyMax) {
this.latencyMax = latencyMax;
}
public boolean isSendLatencyFaultEnable() {
return sendLatencyFaultEnable;
}
public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
this.sendLatencyFaultEnable = sendLatencyFaultEnable;
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
}
- MQFaultStrategy的selectOneMessageQueue方法首先判断是否开启sendLatencyFaultEnable,默认为false,直接走tpInfo.selectOneMessageQueue(lastBrokerName)
TopicPublishInfo
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
public class TopicPublishInfo { private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
//......
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
//......
}
- TopicPublishInfo的selectOneMessageQueue在lastBrokerName为null的时候执行selectOneMessageQueue,采取的轮询的方式选择MessageQueue;lastBrokerName不为null的时候,最多循环messageQueueList.size()次,选出一个brokerName不为lastBrokerName的MessageQueue;如果都没有选到最后通过无参的selectOneMessageQueue来选择
小结
DefaultMQProducerImpl的sendDefaultImpl方法在communicationMode为SYNC时会判断sendResult.getSendStatus()是否是SendStatus.SEND_OK,不是的话,再判断defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK(),如果是则执行continue,否则直接返回sendResult;for循环里头维护了lastBrokerName,每次执行selectOneMessageQueue(topicPublishInfo, lastBrokerName)的时候会传递过去;selectOneMessageQueue方法执行的是mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName)方法
doc
- DefaultMQProducerImpl
以上是 聊聊rocketmq的retryAnotherBrokerWhenNotStoreOK 的全部内容, 来源链接: utcz.com/z/510450.html