聊聊rocketmq的retryTimesWhenSendFailed

编程

本文主要研究一下rocketmq的retryTimesWhenSendFailed

sendDefaultImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {

private final InternalLogger log = ClientLogger.getLog();

private final Random random = new Random();

private final DefaultMQProducer defaultMQProducer;

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =

new ConcurrentHashMap<String, TopicPublishInfo>();

private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();

private final RPCHook rpcHook;

protected BlockingQueue<Runnable> checkRequestQueue;

protected ExecutorService checkExecutor;

private ServiceState serviceState = ServiceState.CREATE_JUST;

private MQClientInstance mQClientFactory;

private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();

private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));

private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;

private final ExecutorService defaultAsyncSenderExecutor;

private ExecutorService asyncSenderExecutor;

//......

private SendResult sendDefaultImpl(

Message msg,

final CommunicationMode communicationMode,

final SendCallback sendCallback,

final long timeout

) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

this.makeSureStateOK();

Validators.checkMessage(msg, this.defaultMQProducer);

final long invokeID = random.nextLong();

long beginTimestampFirst = System.currentTimeMillis();

long beginTimestampPrev = beginTimestampFirst;

long endTimestamp = beginTimestampFirst;

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

if (topicPublishInfo != null && topicPublishInfo.ok()) {

boolean callTimeout = false;

MessageQueue mq = null;

Exception exception = null;

SendResult sendResult = null;

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

int times = 0;

String[] brokersSent = new String[timesTotal];

for (; times < timesTotal; times++) {

String lastBrokerName = null == mq ? null : mq.getBrokerName();

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

if (mqSelected != null) {

mq = mqSelected;

brokersSent[times] = mq.getBrokerName();

try {

beginTimestampPrev = System.currentTimeMillis();

if (times > 0) {

//Reset topic with namespace during resend.

msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));

}

long costTime = beginTimestampPrev - beginTimestampFirst;

if (timeout < costTime) {

callTimeout = true;

break;

}

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

switch (communicationMode) {

case ASYNC:

return null;

case ONEWAY:

return null;

case SYNC:

if (sendResult.getSendStatus() != SendStatus.SEND_OK) {

if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {

continue;

}

}

return sendResult;

default:

break;

}

} catch (RemotingException e) {

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

log.warn(msg.toString());

exception = e;

continue;

} catch (MQClientException e) {

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

log.warn(msg.toString());

exception = e;

continue;

} catch (MQBrokerException e) {

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

log.warn(msg.toString());

exception = e;

switch (e.getResponseCode()) {

case ResponseCode.TOPIC_NOT_EXIST:

case ResponseCode.SERVICE_NOT_AVAILABLE:

case ResponseCode.SYSTEM_ERROR:

case ResponseCode.NO_PERMISSION:

case ResponseCode.NO_BUYER_ID:

case ResponseCode.NOT_IN_CURRENT_UNIT:

continue;

default:

if (sendResult != null) {

return sendResult;

}

throw e;

}

} catch (InterruptedException e) {

endTimestamp = System.currentTimeMillis();

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);

log.warn(msg.toString());

log.warn("sendKernelImpl exception", e);

log.warn(msg.toString());

throw e;

}

} else {

break;

}

}

if (sendResult != null) {

return sendResult;

}

String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",

times,

System.currentTimeMillis() - beginTimestampFirst,

msg.getTopic(),

Arrays.toString(brokersSent));

info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

MQClientException mqClientException = new MQClientException(info, exception);

if (callTimeout) {

throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");

}

if (exception instanceof MQBrokerException) {

mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());

} else if (exception instanceof RemotingConnectException) {

mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);

} else if (exception instanceof RemotingTimeoutException) {

mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);

} else if (exception instanceof MQClientException) {

mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);

}

throw mqClientException;

}

List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();

if (null == nsList || nsList.isEmpty()) {

throw new MQClientException(

"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);

}

throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),

null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);

}

//......

}

  • sendDefaultImpl方法通过tryToFindTopicPublishInfo找到topicPublishInfo,如果不为null且是ok的,则根据communicationMode计算timesTotal,其中是CommunicationMode.SYNC的话,timesTotal为1 + defaultMQProducer.getRetryTimesWhenSendFailed(),否则为1;之后就是最多循环timesTotal次执行sendKernelImpl,RemotingException异常及MQBrokerException异常中responseCode为TOPIC_NOT_EXIST、SERVICE_NOT_AVAILABLE、SYSTEM_ERROR、NO_PERMISSION、NO_BUYER_ID、NOT_IN_CURRENT_UNIT或者是sendResult.getSendStatus()不等于SendStatus.SEND_OK的会重试

sendKernelImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {

private final InternalLogger log = ClientLogger.getLog();

private final Random random = new Random();

private final DefaultMQProducer defaultMQProducer;

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =

new ConcurrentHashMap<String, TopicPublishInfo>();

private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();

private final RPCHook rpcHook;

protected BlockingQueue<Runnable> checkRequestQueue;

protected ExecutorService checkExecutor;

private ServiceState serviceState = ServiceState.CREATE_JUST;

private MQClientInstance mQClientFactory;

private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();

private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));

private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;

private final ExecutorService defaultAsyncSenderExecutor;

private ExecutorService asyncSenderExecutor;

//......

private SendResult sendKernelImpl(final Message msg,

final MessageQueue mq,

final CommunicationMode communicationMode,

final SendCallback sendCallback,

final TopicPublishInfo topicPublishInfo,

final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

long beginStartTime = System.currentTimeMillis();

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

if (null == brokerAddr) {

tryToFindTopicPublishInfo(mq.getTopic());

brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

}

SendMessageContext context = null;

if (brokerAddr != null) {

brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();

try {

//for MessageBatch,ID has been set in the generating process

if (!(msg instanceof MessageBatch)) {

MessageClientIDSetter.setUniqID(msg);

}

boolean topicWithNamespace = false;

if (null != this.mQClientFactory.getClientConfig().getNamespace()) {

msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());

topicWithNamespace = true;

}

int sysFlag = 0;

boolean msgBodyCompressed = false;

if (this.tryToCompressMessage(msg)) {

sysFlag |= MessageSysFlag.COMPRESSED_FLAG;

msgBodyCompressed = true;

}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {

sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;

}

if (hasCheckForbiddenHook()) {

CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();

checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());

checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());

checkForbiddenContext.setCommunicationMode(communicationMode);

checkForbiddenContext.setBrokerAddr(brokerAddr);

checkForbiddenContext.setMessage(msg);

checkForbiddenContext.setMq(mq);

checkForbiddenContext.setUnitMode(this.isUnitMode());

this.executeCheckForbiddenHook(checkForbiddenContext);

}

if (this.hasSendMessageHook()) {

context = new SendMessageContext();

context.setProducer(this);

context.setProducerGroup(this.defaultMQProducer.getProducerGroup());

context.setCommunicationMode(communicationMode);

context.setBornHost(this.defaultMQProducer.getClientIP());

context.setBrokerAddr(brokerAddr);

context.setMessage(msg);

context.setMq(mq);

context.setNamespace(this.defaultMQProducer.getNamespace());

String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (isTrans != null && isTrans.equals("true")) {

context.setMsgType(MessageType.Trans_Msg_Half);

}

if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {

context.setMsgType(MessageType.Delay_Msg);

}

this.executeSendMessageHookBefore(context);

}

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

requestHeader.setTopic(msg.getTopic());

requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());

requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setSysFlag(sysFlag);

requestHeader.setBornTimestamp(System.currentTimeMillis());

requestHeader.setFlag(msg.getFlag());

requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

requestHeader.setReconsumeTimes(0);

requestHeader.setUnitMode(this.isUnitMode());

requestHeader.setBatch(msg instanceof MessageBatch);

if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);

if (reconsumeTimes != null) {

requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));

MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);

}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);

if (maxReconsumeTimes != null) {

requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));

MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);

}

}

SendResult sendResult = null;

switch (communicationMode) {

case ASYNC:

Message tmpMessage = msg;

boolean messageCloned = false;

if (msgBodyCompressed) {

//If msg body was compressed, msgbody should be reset using prevBody.

//Clone new message using commpressed message body and recover origin massage.

//Fix bug:https://github.com/apache/rocketmq-externals/issues/66

tmpMessage = MessageAccessor.cloneMessage(msg);

messageCloned = true;

msg.setBody(prevBody);

}

if (topicWithNamespace) {

if (!messageCloned) {

tmpMessage = MessageAccessor.cloneMessage(msg);

messageCloned = true;

}

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));

}

long costTimeAsync = System.currentTimeMillis() - beginStartTime;

if (timeout < costTimeAsync) {

throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");

}

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

brokerAddr,

mq.getBrokerName(),

tmpMessage,

requestHeader,

timeout - costTimeAsync,

communicationMode,

sendCallback,

topicPublishInfo,

this.mQClientFactory,

this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),

context,

this);

break;

case ONEWAY:

case SYNC:

long costTimeSync = System.currentTimeMillis() - beginStartTime;

if (timeout < costTimeSync) {

throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");

}

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

brokerAddr,

mq.getBrokerName(),

msg,

requestHeader,

timeout - costTimeSync,

communicationMode,

context,

this);

break;

default:

assert false;

break;

}

if (this.hasSendMessageHook()) {

context.setSendResult(sendResult);

this.executeSendMessageHookAfter(context);

}

return sendResult;

} catch (RemotingException e) {

if (this.hasSendMessageHook()) {

context.setException(e);

this.executeSendMessageHookAfter(context);

}

throw e;

} catch (MQBrokerException e) {

if (this.hasSendMessageHook()) {

context.setException(e);

this.executeSendMessageHookAfter(context);

}

throw e;

} catch (InterruptedException e) {

if (this.hasSendMessageHook()) {

context.setException(e);

this.executeSendMessageHookAfter(context);

}

throw e;

} finally {

msg.setBody(prevBody);

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));

}

}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);

}

//......

}

  • sendKernelImpl对于communicationMode为SYNC的,执行的是this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this)

MQClientAPIImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

private final static InternalLogger log = ClientLogger.getLog();

private static boolean sendSmartMsg =

Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));

static {

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

}

private final RemotingClient remotingClient;

private final TopAddressing topAddressing;

private final ClientRemotingProcessor clientRemotingProcessor;

private String nameSrvAddr = null;

private ClientConfig clientConfig;

//......

public SendResult sendMessage(

final String addr,

final String brokerName,

final Message msg,

final SendMessageRequestHeader requestHeader,

final long timeoutMillis,

final CommunicationMode communicationMode,

final SendMessageContext context,

final DefaultMQProducerImpl producer

) throws RemotingException, MQBrokerException, InterruptedException {

return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);

}

//......

}

  • MQClientAPIImpl的sendMessage内部调用的是同名的sendMessage方法,只不过其传递的retryTimesWhenSendFailed参数为0,即出现内部方法出现异常不会重试

小结

DefaultMQProducerImpl的sendDefaultImpl方法通过tryToFindTopicPublishInfo找到topicPublishInfo,如果不为null且是ok的,则根据communicationMode计算timesTotal,其中是CommunicationMode.SYNC的话,timesTotal为1 + defaultMQProducer.getRetryTimesWhenSendFailed(),否则为1;之后就是最多循环timesTotal次执行sendKernelImpl,RemotingException异常及MQBrokerException异常中responseCode为TOPIC_NOT_EXIST、SERVICE_NOT_AVAILABLE、SYSTEM_ERROR、NO_PERMISSION、NO_BUYER_ID、NOT_IN_CURRENT_UNIT或者是sendResult.getSendStatus()不等于SendStatus.SEND_OK的会重试

doc

  • DefaultMQProducerImpl

以上是 聊聊rocketmq的retryTimesWhenSendFailed 的全部内容, 来源链接: utcz.com/z/510421.html

回到顶部