聊聊RocketMQTemplate

编程

本文主要研究一下RocketMQTemplate

RocketMQTemplate

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {

private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);

private DefaultMQProducer producer;

private ObjectMapper objectMapper;

private String charset = "UTF-8";

private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!

//......

@Override

public void afterPropertiesSet() throws Exception {

if (producer != null) {

producer.start();

}

}

@Override

protected void doSend(String destination, Message<?> message) {

SendResult sendResult = syncSend(destination, message);

log.debug("send message to `{}` finished. result:{}", destination, sendResult);

}

@Override

protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {

String content;

if (payload instanceof String) {

content = (String) payload;

} else {

// If payload not as string, use objectMapper change it.

try {

content = objectMapper.writeValueAsString(payload);

} catch (JsonProcessingException e) {

log.error("convert payload to String failed. payload:{}", payload);

throw new RuntimeException("convert to payload to String failed.", e);

}

}

MessageBuilder<?> builder = MessageBuilder.withPayload(content);

if (headers != null) {

builder.copyHeaders(headers);

}

builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);

Message<?> message = builder.build();

if (postProcessor != null) {

message = postProcessor.postProcessMessage(message);

}

return message;

}

@Override

public void destroy() {

if (Objects.nonNull(producer)) {

producer.shutdown();

}

for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) {

if (Objects.nonNull(kv.getValue())) {

kv.getValue().shutdown();

}

}

cache.clear();

}

//......

}

  • RocketMQTemplate继承了spring-messaging的AbstractMessageSendingTemplate,实现了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法
  • afterPropertiesSet方法执行producer.start();destroy方法执行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合
  • doSend方法内部调用的是syncSend方法,返回的sendResult仅仅debug输出;doConvert方法针对String类型的payload不做处理,其他类型使用objectMapper.writeValueAsString转为String作为content,然后构造message,执行postProcessor.postProcessMessage,然后返回

syncSend

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**

* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.

*

* @param destination formats: `topicName:tags`

* @param message {@link org.springframework.messaging.Message}

* @param timeout send timeout with millis

* @param delayLevel level for the delay message

* @return {@link SendResult}

*/

public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {

if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {

log.error("syncSend failed. destination:{}, message is null ", destination);

throw new IllegalArgumentException("`message` and `message.payload` cannot be null");

}

try {

long now = System.currentTimeMillis();

org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,

charset, destination, message);

if (delayLevel > 0) {

rocketMsg.setDelayTimeLevel(delayLevel);

}

SendResult sendResult = producer.send(rocketMsg, timeout);

long costTime = System.currentTimeMillis() - now;

log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());

return sendResult;

} catch (Exception e) {

log.error("syncSend failed. destination:{}, message:{} ", destination, message);

throw new MessagingException(e.getMessage(), e);

}

}

/**

* syncSend batch messages in a given timeout.

*

* @param destination formats: `topicName:tags`

* @param messages Collection of {@link org.springframework.messaging.Message}

* @param timeout send timeout with millis

* @return {@link SendResult}

*/

public SendResult syncSend(String destination, Collection<Message<?>> messages, long timeout) {

if (Objects.isNull(messages) || messages.size() == 0) {

log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);

throw new IllegalArgumentException("`messages` can not be empty");

}

try {

long now = System.currentTimeMillis();

Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();

org.apache.rocketmq.common.message.Message rocketMsg;

for (Message<?> msg:messages) {

if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {

log.warn("Found a message empty in the batch, skip it");

continue;

}

rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, msg);

rmqMsgs.add(rocketMsg);

}

SendResult sendResult = producer.send(rmqMsgs, timeout);

long costTime = System.currentTimeMillis() - now;

log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());

return sendResult;

} catch (Exception e) {

log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());

throw new MessagingException(e.getMessage(), e);

}

}

  • syncSend方法支持单个及多个org.springframework.messaging.Message,其中单个Message的接口支持delayLevel

syncSendOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**

* Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.

*

* @param destination formats: `topicName:tags`

* @param message {@link org.springframework.messaging.Message}

* @param hashKey use this key to select queue. for example: orderId, productId ...

* @param timeout send timeout with millis

* @return {@link SendResult}

*/

public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {

if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {

log.error("syncSendOrderly failed. destination:{}, message is null ", destination);

throw new IllegalArgumentException("`message` and `message.payload` cannot be null");

}

try {

long now = System.currentTimeMillis();

org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,

charset, destination, message);

SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);

long costTime = System.currentTimeMillis() - now;

log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());

return sendResult;

} catch (Exception e) {

log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);

throw new MessagingException(e.getMessage(), e);

}

}

  • syncSendOrderly方法内部调用的是producer.send(rocketMsg, messageQueueSelector, hashKey, timeout)方法,同步返回SendResult

asyncSend

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**

* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in addition.

*

* @param destination formats: `topicName:tags`

* @param message {@link org.springframework.messaging.Message}

* @param sendCallback {@link SendCallback}

* @param timeout send timeout with millis

* @param delayLevel level for the delay message

*/

public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {

if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {

log.error("asyncSend failed. destination:{}, message is null ", destination);

throw new IllegalArgumentException("`message` and `message.payload` cannot be null");

}

try {

org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,

charset, destination, message);

if (delayLevel > 0) {

rocketMsg.setDelayTimeLevel(delayLevel);

}

producer.send(rocketMsg, sendCallback, timeout);

} catch (Exception e) {

log.info("asyncSend failed. destination:{}, message:{} ", destination, message);

throw new MessagingException(e.getMessage(), e);

}

}

  • asyncSend方法需要传入SendCallback,内部执行的是producer.send(rocketMsg, sendCallback, timeout)

asyncSendOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**

* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in

* addition.

*

* @param destination formats: `topicName:tags`

* @param message {@link org.springframework.messaging.Message}

* @param hashKey use this key to select queue. for example: orderId, productId ...

* @param sendCallback {@link SendCallback}

* @param timeout send timeout with millis

*/

public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,

long timeout) {

if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {

log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);

throw new IllegalArgumentException("`message` and `message.payload` cannot be null");

}

try {

org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,

charset, destination, message);

producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);

} catch (Exception e) {

log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);

throw new MessagingException(e.getMessage(), e);

}

}

  • asyncSendOrderly方法内部执行的是producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout)

sendOneWay

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**

* Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won"t wait for

* acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.

* <p>

* One-way transmission is used for cases requiring moderate reliability, such as log collection.

*

* @param destination formats: `topicName:tags`

* @param message {@link org.springframework.messaging.Message}

*/

public void sendOneWay(String destination, Message<?> message) {

if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {

log.error("sendOneWay failed. destination:{}, message is null ", destination);

throw new IllegalArgumentException("`message` and `message.payload` cannot be null");

}

try {

org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,

charset, destination, message);

producer.sendOneway(rocketMsg);

} catch (Exception e) {

log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);

throw new MessagingException(e.getMessage(), e);

}

}

  • sendOneWay方法内部执行的是producer.sendOneway(rocketMsg)

sendOneWayOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**

* Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.

*

* @param destination formats: `topicName:tags`

* @param message {@link org.springframework.messaging.Message}

* @param hashKey use this key to select queue. for example: orderId, productId ...

*/

public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {

if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {

log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);

throw new IllegalArgumentException("`message` and `message.payload` cannot be null");

}

try {

org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,

charset, destination, message);

producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);

} catch (Exception e) {

log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);

throw new MessagingException(e.getMessage(), e);

}

}

  • sendOneWayOrderly方法内部执行的是producer.sendOneway(rocketMsg, messageQueueSelector, hashKey)

sendMessageInTransaction

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**

* Send Spring Message in Transaction

*

* @param txProducerGroup the validate txProducerGroup name, set null if using the default name

* @param destination destination formats: `topicName:tags`

* @param message message {@link org.springframework.messaging.Message}

* @param arg ext arg

* @return TransactionSendResult

* @throws MessagingException

*/

public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {

try {

TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);

org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,

charset, destination, message);

return txProducer.sendMessageInTransaction(rocketMsg, arg);

} catch (MQClientException e) {

throw RocketMQUtil.convert(e);

}

}

  • sendMessageInTransaction方法内部执行的是txProducer.sendMessageInTransaction(rocketMsg, arg)

小结

  • RocketMQTemplate继承了spring-messaging的AbstractMessageSendingTemplate,实现了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法
  • afterPropertiesSet方法执行producer.start();destroy方法执行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合
  • doSend方法内部调用的是syncSend方法,返回的sendResult仅仅debug输出;doConvert方法针对String类型的payload不做处理,其他类型使用objectMapper.writeValueAsString转为String作为content,然后构造message,执行postProcessor.postProcessMessage,然后返回

doc

  • RocketMQTemplate

以上是 聊聊RocketMQTemplate 的全部内容, 来源链接: utcz.com/z/510339.html

回到顶部