聊聊KafkaCanalConnector的getFlatList

编程

getFlatList

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

protected KafkaConsumer<String, Message> kafkaConsumer;

protected KafkaConsumer<String, String> kafkaConsumer2; // 用于扁平message的数据消费

protected String topic;

protected Integer partition;

protected Properties properties;

protected volatile boolean connected = false;

protected volatile boolean running = false;

protected boolean flatMessage;

private Map<Integer, Long> currentOffsets = new ConcurrentHashMap<>();

//......

public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {

waitClientRunning();

if (!running) {

return Lists.newArrayList();

}

List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);

if (messages != null && !messages.isEmpty()) {

this.ack();

}

return messages;

}

public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {

waitClientRunning();

if (!running) {

return Lists.newArrayList();

}

ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));

currentOffsets.clear();

for (TopicPartition topicPartition : records.partitions()) {

currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));

}

if (!records.isEmpty()) {

List<FlatMessage> flatMessages = new ArrayList<>();

for (ConsumerRecord<String, String> record : records) {

String flatMessageJson = record.value();

FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);

flatMessages.add(flatMessage);

}

return flatMessages;

}

return Lists.newArrayList();

}

//......

}

  • KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage

ack

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

//......

public void ack() {

waitClientRunning();

if (!running) {

return;

}

if (kafkaConsumer != null) {

kafkaConsumer.commitSync();

}

if (kafkaConsumer2 != null) {

kafkaConsumer2.commitSync();

}

}

//......

}

  • ack方法主要是执行kafkaConsumer.commitSync()及kafkaConsumer2.commitSync()

rollback

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

public class KafkaCanalConnector implements CanalMQConnector {

//......

public void rollback() {

waitClientRunning();

if (!running) {

return;

}

// 回滚所有分区

if (kafkaConsumer != null) {

for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {

kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);

}

}

if (kafkaConsumer2 != null) {

for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {

kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);

}

}

}

//......

}

  • rollback方法则根据currentOffsets的值通过kafkaConsumer的seek方法进行回退

小结

KafkaCanalConnector的getFlatList方法先通过getFlatListWithoutAck获取messages,然后执行ack;getFlatListWithoutAck通过kafkaConsumer2.poll方法获取records,然后更新记录的topicPartition到currentOffsets,之后将record转换为flatMessage

doc

  • KafkaCanalConnector

以上是 聊聊KafkaCanalConnector的getFlatList 的全部内容, 来源链接: utcz.com/z/515303.html

回到顶部