聊聊rocketmq的compressMsgBodyOverHowmuch

编程

本文主要研究一下rocketmq的compressMsgBodyOverHowmuch

compressMsgBodyOverHowmuch

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {

//......

/**

* Compress message body threshold, namely, message body larger than 4k will be compressed on default.

*/

private int compressMsgBodyOverHowmuch = 1024 * 4;

public int getCompressMsgBodyOverHowmuch() {

return compressMsgBodyOverHowmuch;

}

public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {

this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;

}

//......

}

  • DefaultMQProducer定义了compressMsgBodyOverHowmuch属性,默认值为4k

DefaultMQProducerImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {

private final InternalLogger log = ClientLogger.getLog();

private final Random random = new Random();

private final DefaultMQProducer defaultMQProducer;

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =

new ConcurrentHashMap<String, TopicPublishInfo>();

private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();

private final RPCHook rpcHook;

protected BlockingQueue<Runnable> checkRequestQueue;

protected ExecutorService checkExecutor;

private ServiceState serviceState = ServiceState.CREATE_JUST;

private MQClientInstance mQClientFactory;

private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();

private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));

private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;

private final ExecutorService defaultAsyncSenderExecutor;

private ExecutorService asyncSenderExecutor;

//......

private SendResult sendKernelImpl(final Message msg,

final MessageQueue mq,

final CommunicationMode communicationMode,

final SendCallback sendCallback,

final TopicPublishInfo topicPublishInfo,

final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

long beginStartTime = System.currentTimeMillis();

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

if (null == brokerAddr) {

tryToFindTopicPublishInfo(mq.getTopic());

brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

}

SendMessageContext context = null;

if (brokerAddr != null) {

brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();

try {

//for MessageBatch,ID has been set in the generating process

if (!(msg instanceof MessageBatch)) {

MessageClientIDSetter.setUniqID(msg);

}

boolean topicWithNamespace = false;

if (null != this.mQClientFactory.getClientConfig().getNamespace()) {

msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());

topicWithNamespace = true;

}

int sysFlag = 0;

boolean msgBodyCompressed = false;

if (this.tryToCompressMessage(msg)) {

sysFlag |= MessageSysFlag.COMPRESSED_FLAG;

msgBodyCompressed = true;

}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {

sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;

}

if (hasCheckForbiddenHook()) {

CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();

checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());

checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());

checkForbiddenContext.setCommunicationMode(communicationMode);

checkForbiddenContext.setBrokerAddr(brokerAddr);

checkForbiddenContext.setMessage(msg);

checkForbiddenContext.setMq(mq);

checkForbiddenContext.setUnitMode(this.isUnitMode());

this.executeCheckForbiddenHook(checkForbiddenContext);

}

if (this.hasSendMessageHook()) {

context = new SendMessageContext();

context.setProducer(this);

context.setProducerGroup(this.defaultMQProducer.getProducerGroup());

context.setCommunicationMode(communicationMode);

context.setBornHost(this.defaultMQProducer.getClientIP());

context.setBrokerAddr(brokerAddr);

context.setMessage(msg);

context.setMq(mq);

context.setNamespace(this.defaultMQProducer.getNamespace());

String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (isTrans != null && isTrans.equals("true")) {

context.setMsgType(MessageType.Trans_Msg_Half);

}

if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {

context.setMsgType(MessageType.Delay_Msg);

}

this.executeSendMessageHookBefore(context);

}

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

requestHeader.setTopic(msg.getTopic());

requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());

requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setSysFlag(sysFlag);

requestHeader.setBornTimestamp(System.currentTimeMillis());

requestHeader.setFlag(msg.getFlag());

requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

requestHeader.setReconsumeTimes(0);

requestHeader.setUnitMode(this.isUnitMode());

requestHeader.setBatch(msg instanceof MessageBatch);

if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);

if (reconsumeTimes != null) {

requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));

MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);

}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);

if (maxReconsumeTimes != null) {

requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));

MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);

}

}

SendResult sendResult = null;

switch (communicationMode) {

case ASYNC:

Message tmpMessage = msg;

boolean messageCloned = false;

if (msgBodyCompressed) {

//If msg body was compressed, msgbody should be reset using prevBody.

//Clone new message using commpressed message body and recover origin massage.

//Fix bug:https://github.com/apache/rocketmq-externals/issues/66

tmpMessage = MessageAccessor.cloneMessage(msg);

messageCloned = true;

msg.setBody(prevBody);

}

if (topicWithNamespace) {

if (!messageCloned) {

tmpMessage = MessageAccessor.cloneMessage(msg);

messageCloned = true;

}

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));

}

long costTimeAsync = System.currentTimeMillis() - beginStartTime;

if (timeout < costTimeAsync) {

throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");

}

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

brokerAddr,

mq.getBrokerName(),

tmpMessage,

requestHeader,

timeout - costTimeAsync,

communicationMode,

sendCallback,

topicPublishInfo,

this.mQClientFactory,

this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),

context,

this);

break;

case ONEWAY:

case SYNC:

long costTimeSync = System.currentTimeMillis() - beginStartTime;

if (timeout < costTimeSync) {

throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");

}

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

brokerAddr,

mq.getBrokerName(),

msg,

requestHeader,

timeout - costTimeSync,

communicationMode,

context,

this);

break;

default:

assert false;

break;

}

if (this.hasSendMessageHook()) {

context.setSendResult(sendResult);

this.executeSendMessageHookAfter(context);

}

return sendResult;

} catch (RemotingException e) {

if (this.hasSendMessageHook()) {

context.setException(e);

this.executeSendMessageHookAfter(context);

}

throw e;

} catch (MQBrokerException e) {

if (this.hasSendMessageHook()) {

context.setException(e);

this.executeSendMessageHookAfter(context);

}

throw e;

} catch (InterruptedException e) {

if (this.hasSendMessageHook()) {

context.setException(e);

this.executeSendMessageHookAfter(context);

}

throw e;

} finally {

msg.setBody(prevBody);

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));

}

}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);

}

private boolean tryToCompressMessage(final Message msg) {

if (msg instanceof MessageBatch) {

//batch dose not support compressing right now

return false;

}

byte[] body = msg.getBody();

if (body != null) {

if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {

try {

byte[] data = UtilAll.compress(body, zipCompressLevel);

if (data != null) {

msg.setBody(data);

return true;

}

} catch (IOException e) {

log.error("tryToCompressMessage exception", e);

log.warn(msg.toString());

}

}

}

return false;

}

//......

}

  • DefaultMQProducerImpl的sendKernelImpl方法会通过tryToCompressMessage(msg)方法来决定是否压缩msgBody,返回true的话,会设置sysFlag,然后通过requestHeader传递给broker

MessageDecoder

rocketmq/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java

public class MessageDecoder {

public final static int MSG_ID_LENGTH = 8 + 8;

public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");

public final static int MESSAGE_MAGIC_CODE_POSTION = 4;

public final static int MESSAGE_FLAG_POSTION = 16;

public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;

public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;

public final static int MESSAGE_MAGIC_CODE = -626843481;

public static final char NAME_VALUE_SEPARATOR = 1;

public static final char PROPERTY_SEPARATOR = 2;

public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;

public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE

+ 4 // 2 MAGICCODE

+ 4 // 3 BODYCRC

+ 4 // 4 QUEUEID

+ 4 // 5 FLAG

+ 8 // 6 QUEUEOFFSET

+ 8 // 7 PHYSICALOFFSET

+ 4 // 8 SYSFLAG

+ 8 // 9 BORNTIMESTAMP

+ 8 // 10 BORNHOST

+ 8 // 11 STORETIMESTAMP

+ 8 // 12 STOREHOSTADDRESS

+ 4 // 13 RECONSUMETIMES

+ 8; // 14 Prepared Transaction Offset

//......

public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {

byte[] body = messageExt.getBody();

byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);

byte topicLen = (byte) topics.length;

String properties = messageProperties2String(messageExt.getProperties());

byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);

short propertiesLength = (short) propertiesBytes.length;

int sysFlag = messageExt.getSysFlag();

byte[] newBody = messageExt.getBody();

if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {

newBody = UtilAll.compress(body, 5);

}

int bodyLength = newBody.length;

int storeSize = messageExt.getStoreSize();

ByteBuffer byteBuffer;

if (storeSize > 0) {

byteBuffer = ByteBuffer.allocate(storeSize);

} else {

storeSize = 4 // 1 TOTALSIZE

+ 4 // 2 MAGICCODE

+ 4 // 3 BODYCRC

+ 4 // 4 QUEUEID

+ 4 // 5 FLAG

+ 8 // 6 QUEUEOFFSET

+ 8 // 7 PHYSICALOFFSET

+ 4 // 8 SYSFLAG

+ 8 // 9 BORNTIMESTAMP

+ 8 // 10 BORNHOST

+ 8 // 11 STORETIMESTAMP

+ 8 // 12 STOREHOSTADDRESS

+ 4 // 13 RECONSUMETIMES

+ 8 // 14 Prepared Transaction Offset

+ 4 + bodyLength // 14 BODY

+ 1 + topicLen // 15 TOPIC

+ 2 + propertiesLength // 16 propertiesLength

+ 0;

byteBuffer = ByteBuffer.allocate(storeSize);

}

// 1 TOTALSIZE

byteBuffer.putInt(storeSize);

// 2 MAGICCODE

byteBuffer.putInt(MESSAGE_MAGIC_CODE);

// 3 BODYCRC

int bodyCRC = messageExt.getBodyCRC();

byteBuffer.putInt(bodyCRC);

// 4 QUEUEID

int queueId = messageExt.getQueueId();

byteBuffer.putInt(queueId);

// 5 FLAG

int flag = messageExt.getFlag();

byteBuffer.putInt(flag);

// 6 QUEUEOFFSET

long queueOffset = messageExt.getQueueOffset();

byteBuffer.putLong(queueOffset);

// 7 PHYSICALOFFSET

long physicOffset = messageExt.getCommitLogOffset();

byteBuffer.putLong(physicOffset);

// 8 SYSFLAG

byteBuffer.putInt(sysFlag);

// 9 BORNTIMESTAMP

long bornTimeStamp = messageExt.getBornTimestamp();

byteBuffer.putLong(bornTimeStamp);

// 10 BORNHOST

InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();

byteBuffer.put(bornHost.getAddress().getAddress());

byteBuffer.putInt(bornHost.getPort());

// 11 STORETIMESTAMP

long storeTimestamp = messageExt.getStoreTimestamp();

byteBuffer.putLong(storeTimestamp);

// 12 STOREHOST

InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();

byteBuffer.put(serverHost.getAddress().getAddress());

byteBuffer.putInt(serverHost.getPort());

// 13 RECONSUMETIMES

int reconsumeTimes = messageExt.getReconsumeTimes();

byteBuffer.putInt(reconsumeTimes);

// 14 Prepared Transaction Offset

long preparedTransactionOffset = messageExt.getPreparedTransactionOffset();

byteBuffer.putLong(preparedTransactionOffset);

// 15 BODY

byteBuffer.putInt(bodyLength);

byteBuffer.put(newBody);

// 16 TOPIC

byteBuffer.put(topicLen);

byteBuffer.put(topics);

// 17 properties

byteBuffer.putShort(propertiesLength);

byteBuffer.put(propertiesBytes);

return byteBuffer.array();

}

public static MessageExt decode(

java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) {

try {

MessageExt msgExt;

if (isClient) {

msgExt = new MessageClientExt();

} else {

msgExt = new MessageExt();

}

// 1 TOTALSIZE

int storeSize = byteBuffer.getInt();

msgExt.setStoreSize(storeSize);

// 2 MAGICCODE

byteBuffer.getInt();

// 3 BODYCRC

int bodyCRC = byteBuffer.getInt();

msgExt.setBodyCRC(bodyCRC);

// 4 QUEUEID

int queueId = byteBuffer.getInt();

msgExt.setQueueId(queueId);

// 5 FLAG

int flag = byteBuffer.getInt();

msgExt.setFlag(flag);

// 6 QUEUEOFFSET

long queueOffset = byteBuffer.getLong();

msgExt.setQueueOffset(queueOffset);

// 7 PHYSICALOFFSET

long physicOffset = byteBuffer.getLong();

msgExt.setCommitLogOffset(physicOffset);

// 8 SYSFLAG

int sysFlag = byteBuffer.getInt();

msgExt.setSysFlag(sysFlag);

// 9 BORNTIMESTAMP

long bornTimeStamp = byteBuffer.getLong();

msgExt.setBornTimestamp(bornTimeStamp);

// 10 BORNHOST

byte[] bornHost = new byte[4];

byteBuffer.get(bornHost, 0, 4);

int port = byteBuffer.getInt();

msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));

// 11 STORETIMESTAMP

long storeTimestamp = byteBuffer.getLong();

msgExt.setStoreTimestamp(storeTimestamp);

// 12 STOREHOST

byte[] storeHost = new byte[4];

byteBuffer.get(storeHost, 0, 4);

port = byteBuffer.getInt();

msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));

// 13 RECONSUMETIMES

int reconsumeTimes = byteBuffer.getInt();

msgExt.setReconsumeTimes(reconsumeTimes);

// 14 Prepared Transaction Offset

long preparedTransactionOffset = byteBuffer.getLong();

msgExt.setPreparedTransactionOffset(preparedTransactionOffset);

// 15 BODY

int bodyLen = byteBuffer.getInt();

if (bodyLen > 0) {

if (readBody) {

byte[] body = new byte[bodyLen];

byteBuffer.get(body);

// uncompress body

if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {

body = UtilAll.uncompress(body);

}

msgExt.setBody(body);

} else {

byteBuffer.position(byteBuffer.position() + bodyLen);

}

}

// 16 TOPIC

byte topicLen = byteBuffer.get();

byte[] topic = new byte[(int) topicLen];

byteBuffer.get(topic);

msgExt.setTopic(new String(topic, CHARSET_UTF8));

// 17 properties

short propertiesLength = byteBuffer.getShort();

if (propertiesLength > 0) {

byte[] properties = new byte[propertiesLength];

byteBuffer.get(properties);

String propertiesString = new String(properties, CHARSET_UTF8);

Map<String, String> map = string2messageProperties(propertiesString);

msgExt.setProperties(map);

}

ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);

String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());

msgExt.setMsgId(msgId);

if (isClient) {

((MessageClientExt) msgExt).setOffsetMsgId(msgId);

}

return msgExt;

} catch (Exception e) {

byteBuffer.position(byteBuffer.limit());

}

return null;

}

//......

}

  • MessageDecoder的encode会根据sysFlag判断是否需要压缩,是的话执行UtilAll.compress(body, 5);decode方法会根据根据sysFlag判断是否需要解压缩,是的话执行UtilAll.uncompress(body)

小结

DefaultMQProducerImpl的sendKernelImpl方法会通过tryToCompressMessage(msg)方法来决定是否压缩msgBody,返回true的话,会设置sysFlag,然后通过requestHeader传递给broker;MessageDecoder的encode会根据sysFlag判断是否需要压缩,是的话执行UtilAll.compress(body, 5);decode方法会根据根据sysFlag判断是否需要解压缩,是的话执行UtilAll.uncompress(body)

doc

  • DefaultMQProducer

以上是 聊聊rocketmq的compressMsgBodyOverHowmuch 的全部内容, 来源链接: utcz.com/z/510528.html

回到顶部