聊聊CarreraProducer的sendDelay

编程

本文主要研究一下CarreraProducer的sendDelay

ProducerInterface

DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/ProducerInterface.java

public interface ProducerInterface {

void start() throws Exception;

void shutdown();

Result sendMessage(Message message);

Result send(String topic, byte[] body);

Result send(String topic, String body);

Result sendByCharset(String topic, String body, String charsetName);

Result send(String topic, String body, String key, String... tags);

Result send(String topic, byte[] body, String key, String... tags);

Result sendByCharset(String topic, String body, String charsetName, String key, String... tags);

Result sendWithHashId(String topic, long hashId, String body, String key, String... tags);

Result sendWithHashId(String topic, long hashId, byte[] body, String key, String... tags);

Result sendWithHashIdByCharset(String topic, long hashId, String body, String charsetName, String key, String[] tags);

Result sendWithPartition(String topic, int partitionId, long hashId, byte[] body, String key, String... tags);

Result sendWithPartition(String topic, int partitionId, long hashId, String body, String key, String... tags);

Result sendWithPartitionByCharset(String topic, int partitionId, long hashId, String body, String charsetName, String key, String[] tags);

Result sendBatchConcurrently(List<Message> messages);

Result sendBatchOrderly(List<Message> messages);

DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta);

DelayResult sendDelay(String topic, String body, DelayMeta delayMeta);

DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName);

DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags);

DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags);

DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags);

DelayResult cancelDelay(String topic, String uniqDelayMsgId);

DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags);

Result sendBatchSync(List<Message> messages);

}

  • ProducerInterface定义了几个sendDelay及cancelDelay方法

CarreraProducer

DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/CarreraProducer.java

public class CarreraProducer implements ProducerInterface {

private ProducerInterface producer;

private CarreraConfig config;

public CarreraProducer(CarreraConfig config) {

producer = new LocalCarreraProducer(config);

this.config = config;

}

public static CarreraProducer newCarreraProducer(CarreraConfig config) throws Exception {

return new CarreraProducer(config);

}

public MessageBuilder messageBuilder() {

return new MessageBuilder(this);

}

public AddDelayMessageBuilder addDelayMessageBuilder() {

return new AddDelayMessageBuilder(this);

}

public CancelDelayMessageBuilder cancelDelayMessageBuilder() {

return new CancelDelayMessageBuilder(this);

}

public AddTxMonitorMessageBuilder addTxMonitorMessageBuilder(AddDelayMessageBuilder addDelayMessageBuilder) {

return new AddTxMonitorMessageBuilder(addDelayMessageBuilder);

}

public CancelTxMonitorMessageBuilder cancelTxMonitorMessageBuilder(CancelDelayMessageBuilder cancelDelayMessageBuilder) {

return new CancelTxMonitorMessageBuilder(cancelDelayMessageBuilder);

}

public TxBusinessMessageBuilder txBusinessMessageBuilder(MessageBuilder messageBuilder) {

return new TxBusinessMessageBuilder(messageBuilder);

}

//......

@Override

public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta) {

return producer.sendDelay(topic, body, delayMeta);

}

@Override

public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta) {

return producer.sendDelay(topic, body, delayMeta);

}

@Override

public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName) {

return producer.sendDelayByCharset(topic, body, delayMeta, charsetName);

}

@Override

public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags) {

return producer.sendDelay(topic, body, delayMeta, tags);

}

@Override

public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags) {

return producer.sendDelay(topic, body, delayMeta, tags);

}

@Override

public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags) {

return producer.sendDelayByCharset(topic, body, delayMeta, charsetName, tags);

}

@Override

public DelayResult cancelDelay(String topic, String uniqDelayMsgId) {

return producer.cancelDelay(topic, uniqDelayMsgId);

}

@Override

public DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags) {

return producer.cancelDelay(topic, uniqDelayMsgId, tags);

}

//......

}

  • CarreraProducer实现了ProducerInterface接口,其sendDelay、cancelDelay方法委托给了LocalCarreraProducer

LocalCarreraProducer

DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/LocalCarreraProducer.java

public class LocalCarreraProducer extends CarreraProducerBase implements ProducerInterface {

public LocalCarreraProducer(CarreraConfig config) {

super(config);

}

@Override

protected void initNodeMgr() throws Exception {

nodeMgr = NodeManager.newLocalNodeManager(config, config.getCarreraProxyList());

nodeMgr.initConnectionPool();

}

}

  • LocalCarreraProducer继承了CarreraProducerBase,实现了ProducerInterface接口

CarreraProducerBase

DDMQ/carrera-sdk/producer/java/carrera-producer-sdk/src/main/java/com/xiaojukeji/carrera/producer/CarreraProducerBase.java

public abstract class CarreraProducerBase implements ProducerInterface {

private static final Logger LOGGER = LoggerFactory.getLogger(CarreraProducerBase.class);

private static final Logger DROP_LOGGER = LoggerFactory.getLogger("DROP_LOG");

private static final int DELAY_ACTIONS_ADD = 1;

private static final int DELAY_ACTIONS_CANCEL = 2;

private static final String TAGS_SEPARATOR = "||";

private volatile boolean isRunning = false;

protected NodeManager nodeMgr;

protected CarreraConfig config;

private ExecutorService executor;

public CarreraProducerBase(CarreraConfig config) {

this.config = config;

}

//......

public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta) {

return sendDelayMessage(buildDelayMessage4Add(topic, body, delayMeta, randomKey()));

}

public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta) {

return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(), delayMeta, randomKey()));

}

public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName) {

try {

return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(charsetName), delayMeta));

} catch (UnsupportedEncodingException e) {

return new DelayResult(CHARSET_ENCODING_EXCEPTION, e.getMessage(), "");

}

}

public DelayResult sendDelay(String topic, String body, DelayMeta delayMeta, String... tags) {

return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(), delayMeta, tags));

}

public DelayResult sendDelay(String topic, byte[] body, DelayMeta delayMeta, String... tags) {

return sendDelayMessage(buildDelayMessage4Add(topic, body, delayMeta, tags));

}

public DelayResult sendDelayByCharset(String topic, String body, DelayMeta delayMeta, String charsetName, String... tags) {

try {

return sendDelayMessage(buildDelayMessage4Add(topic, body.getBytes(charsetName), delayMeta, tags));

} catch (UnsupportedEncodingException e) {

return new DelayResult(CHARSET_ENCODING_EXCEPTION, e.getMessage(), "");

}

}

public DelayResult cancelDelay(String topic, String uniqDelayMsgId) {

return sendDelayMessage(buildDelayMessage4Cancel(topic, uniqDelayMsgId, randomKey()));

}

public DelayResult cancelDelay(String topic, String uniqDelayMsgId, String... tags) {

return sendDelayMessage(buildDelayMessage4Cancel(topic, uniqDelayMsgId, tags));

}

private DelayMessage buildDelayMessage4Add(String topic, byte[] body, DelayMeta delayMeta, String... tags) {

DelayMessage delayMessage = new DelayMessage();

delayMessage.setTopic(topic);

delayMessage.setBody(body);

delayMessage.setAction(DELAY_ACTIONS_ADD);

delayMessage.setTimestamp(delayMeta.getTimestamp());

delayMessage.setDmsgtype(delayMeta.getDmsgtype());

delayMessage.setInterval(delayMeta.getInterval());

delayMessage.setExpire(delayMeta.getExpire());

delayMessage.setTimes(delayMeta.getTimes());

delayMessage.setUuid(new UUID().toString());

delayMessage.setVersion(VersionUtils.getVersion());

if (null != delayMeta.getProperties() && delayMeta.getProperties().size() > 0) {

delayMessage.setProperties(delayMeta.getProperties());

}

if (ArrayUtils.isNotEmpty(tags)) {

delayMessage.setTags(StringUtils.join(tags, TAGS_SEPARATOR));

}

return delayMessage;

}

private DelayMessage buildDelayMessage4Cancel(String topic, String uniqDelayMsgId, String... tags) {

DelayMessage delayMessage = new DelayMessage();

delayMessage.setTopic(topic);

delayMessage.setUniqDelayMsgId(uniqDelayMsgId);

delayMessage.setAction(DELAY_ACTIONS_CANCEL);

delayMessage.setVersion(VersionUtils.getVersion());

delayMessage.setBody("c".getBytes()); // if body is null, new String(message.getBody()) will throw NullPointerException

if (ArrayUtils.isNotEmpty(tags)) {

delayMessage.setTags(StringUtils.join(tags, TAGS_SEPARATOR));

}

return delayMessage;

}

private DelayResult sendDelayMessage(DelayMessage message) {

DelayResult result = new DelayResult(UNKNOWN_EXCEPTION, "unknown exception", "");

if (!isRunning) {

result.setCode(CLIENT_EXCEPTION);

result.setMsg("please execute the start() method before sending the message");

return result;

}

int retryCnt = 0;

long start, used = 0;

long begin = TimeUtils.getCurTime();

String proxyAddress = null;

do {

CarreraConnection connection = null;

try {

connection = nodeMgr.borrowConnection(config.getCarreraClientTimeout());

if (connection == null) {

if (result.getCode() == UNKNOWN_EXCEPTION) {

result.setCode(NO_MORE_HEALTHY_NODE);

result.setMsg("no more healthy node");

}

delay(config.getCarreraClientTimeout());

continue;

}

proxyAddress = connection.getNode().toString();

start = TimeUtils.getCurTime();

result = connection.sendDelay(message, this.config.getCarreraProxyTimeout());

used = TimeUtils.getElapseTime(start);

if (result.getCode() > OK) {

switch (result.getCode()) {

case FAIL_ILLEGAL_MSG:

case FAIL_TOPIC_NOT_ALLOWED:

case FAIL_TOPIC_NOT_EXIST:

case FAIL_TIMEOUT:

case FAIL_REFUSED_BY_RATE_LIMITER:

delay(Math.max(this.config.getCarreraClientTimeout() - TimeUtils.getElapseTime(start), 0));

break;

default:

nodeMgr.unhealthyNode(connection.getNode());

delay(Math.max(this.config.getCarreraClientTimeout() - TimeUtils.getElapseTime(start), 0));

break; //break switch

}

} else {

break; //break loop

}

} catch (Exception e) {

LOGGER.warn("sendMessage failed, retry count:" + retryCnt + ", topic:" + message.topic + ", key:" + message.uniqDelayMsgId, e);

result.setCode(CLIENT_EXCEPTION);

result.setMsg(e.toString());

} finally {

if (connection != null) {

nodeMgr.returnConnection(connection);

}

}

} while (retryCnt++ < this.config.getCarreraClientRetry());

if (result.getCode() > OK) {

LOGGER.error("send delay msg result:{}; msg[ip:{},topic:{},uuid:{},uniqDelayMsgId:{},len:{},used:{},retryCount:{},ret.Code:{},ret.Msg:{}]",

resultToString(result), proxyAddress, message.getTopic(), message.getUuid(), message.getUniqDelayMsgId(),

StringUtils.length(new String(message.getBody())), TimeUtils.getElapseTime(begin), retryCnt, result.getCode(), result.getMsg());

} else {

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("send delay msg result:{}; msg[ip:{},topic:{},uniqDelayMsgId:{},len:{},used:{},retryCount:{}]",

resultToString(result), proxyAddress, message.getTopic(), result.getUniqDelayMsgId(),

StringUtils.length(new String(message.getBody())), used, retryCnt);

}

}

return result;

}

}

  • sendDelay通过buildDelayMessage4Add构造DelayMessage,而cancelDelay通过buildDelayMessage4Cancel构造DelayMessage,最后通过sendDelayMessage方法发送消息

小结

ProducerInterface定义了几个sendDelay及cancelDelay方法;CarreraProducer实现了ProducerInterface接口,其sendDelay、cancelDelay方法委托给了LocalCarreraProducer;LocalCarreraProducer继承了CarreraProducerBase,实现了ProducerInterface接口;CarreraProducerBase的sendDelay通过buildDelayMessage4Add构造DelayMessage,而cancelDelay通过buildDelayMessage4Cancel构造DelayMessage,最后通过sendDelayMessage方法发送消息

doc

  • carrera-chronos

以上是 聊聊CarreraProducer的sendDelay 的全部内容, 来源链接: utcz.com/z/512376.html

回到顶部