聊聊CanalEventDownStreamHandler

编程

本文主要研究一下CanalEventDownStreamHandler

CanalEventDownStreamHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventDownStreamHandler.java

public interface CanalEventDownStreamHandler<T> extends CanalLifeCycle {

/**

* 提交到store之前做一下处理,允许替换Event

*/

public T before(T events);

/**

* store处于full后,retry时处理做一下处理

*/

public T retry(T events);

/**

* 提交store成功后做一下处理

*/

public T after(T events);

}

  • CanalEventDownStreamHandler继承了CanalLifeCycle接口,它定义了before、retry、after方法

AbstractCanalEventDownStreamHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventDownStreamHandler.java

public class AbstractCanalEventDownStreamHandler<T> extends AbstractCanalLifeCycle implements CanalEventDownStreamHandler<T> {

public T before(T events) {

return events;

}

public T retry(T events) {

return events;

}

public T after(T events) {

return events;

}

}

  • AbstractCanalEventDownStreamHandler继承了AbstractCanalLifeCycle,实现了CanalEventDownStreamHandler接口,before、retry、after方法默认返回入参的events

HeartBeatEntryEventHandler

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/HeartBeatEntryEventHandler.java

public class HeartBeatEntryEventHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {

public List<Event> before(List<Event> events) {

boolean existHeartBeat = false;

for (Event event : events) {

if (event.getEntryType() == EntryType.HEARTBEAT) {

existHeartBeat = true;

}

}

if (!existHeartBeat) {

return events;

} else {

// 目前heartbeat和其他事件是分离的,保险一点还是做一下检查处理

List<Event> result = new ArrayList<Event>();

for (Event event : events) {

if (event.getEntryType() != EntryType.HEARTBEAT) {

result.add(event);

}

}

return result;

}

}

}

  • HeartBeatEntryEventHandler继承了AbstractCanalEventDownStreamHandler,其before方法遍历events判断是否有EntryType.HEARTBEAT类型的event,如果没有则立即返回events,如果有则返回非heartbeat的事件

PrometheusCanalEventDownStreamHandler

canal-1.1.4/prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/PrometheusCanalEventDownStreamHandler.java

public class PrometheusCanalEventDownStreamHandler extends AbstractCanalEventDownStreamHandler<List<Event>> {

private final AtomicLong latestExecuteTime = new AtomicLong(System.currentTimeMillis());

private final AtomicLong transactionCounter = new AtomicLong(0L);

@Override

public List<Event> before(List<Event> events) {

long localExecTime = 0L;

if (events != null && !events.isEmpty()) {

for (Event e : events) {

EntryType type = e.getEntryType();

if (type == null) continue;

switch (type) {

case TRANSACTIONBEGIN: {

long exec = e.getExecuteTime();

if (exec > 0) localExecTime = exec;

break;

}

case ROWDATA: {

long exec = e.getExecuteTime();

if (exec > 0) localExecTime = exec;

break;

}

case TRANSACTIONEND: {

long exec = e.getExecuteTime();

if (exec > 0) localExecTime = exec;

transactionCounter.incrementAndGet();

break;

}

case HEARTBEAT:

CanalEntry.EventType eventType = e.getEventType();

if (eventType == CanalEntry.EventType.MHEARTBEAT) {

localExecTime = System.currentTimeMillis();

}

break;

default:

break;

}

}

if (localExecTime > 0) {

latestExecuteTime.lazySet(localExecTime);

}

}

return events;

}

@Override

public void start() {

super.start();

}

@Override

public void stop() {

super.stop();

}

public AtomicLong getLatestExecuteTime() {

return latestExecuteTime;

}

public AtomicLong getTransactionCounter() {

return transactionCounter;

}

}

  • PrometheusCanalEventDownStreamHandler继承了AbstractCanalEventDownStreamHandler,其before方法遍历events,然后根据不同的EntryType来更新localExecTime、transactionCounter、latestExecuteTime

小结

CanalEventDownStreamHandler继承了CanalLifeCycle接口,它定义了before、retry、after方法;AbstractCanalEventDownStreamHandler继承了AbstractCanalLifeCycle,实现了CanalEventDownStreamHandler接口,before、retry、after方法默认返回入参的events;目前有HeartBeatEntryEventHandler及PrometheusCanalEventDownStreamHandler继承了AbstractCanalEventDownStreamHandler

doc

  • CanalEventDownStreamHandler

以上是 聊聊CanalEventDownStreamHandler 的全部内容, 来源链接: utcz.com/z/515523.html

回到顶部