聊聊RocketMQCanalConnector的getFlatList

编程

getFlatList

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

public class RocketMQCanalConnector implements CanalMQConnector {

private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);

private static final String CLOUD_ACCESS_CHANNEL = "cloud";

private String nameServer;

private String topic;

private String groupName;

private volatile boolean connected = false;

private DefaultMQPushConsumer rocketMQConsumer;

private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;

private int batchSize = -1;

private long batchProcessTimeout = 60 * 1000;

private boolean flatMessage;

private volatile ConsumerBatchMessage lastGetBatchMessage = null;

private String accessKey;

private String secretKey;

private String customizedTraceTopic;

private boolean enableMessageTrace = false;

private String accessChannel;

private String namespace;

//......

public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {

List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);

if (messages != null && !messages.isEmpty()) {

ack();

}

return messages;

}

public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {

try {

if (this.lastGetBatchMessage != null) {

throw new CanalClientException("mq get/ack not support concurrent & async ack");

}

ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);

if (batchMessage != null) {

this.lastGetBatchMessage = batchMessage;

return batchMessage.getData();

}

} catch (InterruptedException ex) {

logger.warn("Get message timeout", ex);

throw new CanalClientException("Failed to fetch the data after: " + timeout);

}

return Lists.newArrayList();

}

public void ack() throws CanalClientException {

try {

if (this.lastGetBatchMessage != null) {

this.lastGetBatchMessage.ack();

}

} catch (Throwable e) {

if (this.lastGetBatchMessage != null) {

this.lastGetBatchMessage.fail();

}

} finally {

this.lastGetBatchMessage = null;

}

}

//......

}

  • RocketMQCanalConnector的getFlatList方法通过getFlatListWithoutAck获取FlatMessage列表,然后在messages不为空时执行ack;getFlatListWithoutAck方法从messageBlockingQueue拉取batchMessage,若不为null则更新lastGetBatchMessage,返回batchMessage.getData();ack则执行lastGetBatchMessage.ack(),若出现异常则执行lastGetBatchMessage.fail()

subscribe

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

public class RocketMQCanalConnector implements CanalMQConnector {

//......

public synchronized void subscribe(String filter) throws CanalClientException {

if (connected) {

return;

}

try {

if (rocketMQConsumer == null) {

this.connect();

}

rocketMQConsumer.subscribe(this.topic, "*");

rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {

@Override

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {

context.setAutoCommit(true);

boolean isSuccess = process(messageExts);

if (isSuccess) {

return ConsumeOrderlyStatus.SUCCESS;

} else {

return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

}

}

});

rocketMQConsumer.start();

} catch (MQClientException ex) {

connected = false;

logger.error("Start RocketMQ consumer error", ex);

}

connected = true;

}

//......

}

  • subscribe方法给rocketMQConsumer注册了MessageListenerOrderly,其consumeMessage方法执行process方法

process

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

public class RocketMQCanalConnector implements CanalMQConnector {

//......

private boolean process(List<MessageExt> messageExts) {

if (logger.isDebugEnabled()) {

logger.debug("Get Message: {}", messageExts);

}

List messageList = Lists.newArrayList();

for (MessageExt messageExt : messageExts) {

byte[] data = messageExt.getBody();

if (data != null) {

try {

if (!flatMessage) {

Message message = CanalMessageDeserializer.deserializer(data);

messageList.add(message);

} else {

FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);

messageList.add(flatMessage);

}

} catch (Exception ex) {

logger.error("Add message error", ex);

throw new CanalClientException(ex);

}

} else {

logger.warn("Received message data is null");

}

}

ConsumerBatchMessage batchMessage;

if (!flatMessage) {

batchMessage = new ConsumerBatchMessage<Message>(messageList);

} else {

batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);

}

try {

messageBlockingQueue.put(batchMessage);

} catch (InterruptedException e) {

logger.error("Put message to queue error", e);

throw new RuntimeException(e);

}

boolean isCompleted;

try {

isCompleted = batchMessage.waitFinish(batchProcessTimeout);

} catch (InterruptedException e) {

logger.error("Interrupted when waiting messages to be finished.", e);

throw new RuntimeException(e);

}

boolean isSuccess = batchMessage.isSuccess();

return isCompleted && isSuccess;

}

//......

}

  • process方法会将MessageExt转换为Message或者FlatMessage,然后组装成ConsumerBatchMessage放到messageBlockingQueue中

小结

RocketMQCanalConnector的getFlatList方法通过getFlatListWithoutAck获取FlatMessage列表,然后在messages不为空时执行ack;getFlatListWithoutAck方法从messageBlockingQueue拉取batchMessage,若不为null则更新lastGetBatchMessage,返回batchMessage.getData();ack则执行lastGetBatchMessage.ack(),若出现异常则执行lastGetBatchMessage.fail()

doc

  • RocketMQCanalConnector

以上是 聊聊RocketMQCanalConnector的getFlatList 的全部内容, 来源链接: utcz.com/z/515270.html

回到顶部