聊聊CanalEventSink

编程

本文主要研究一下CanalEventSink

CanalEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/CanalEventSink.java

public interface CanalEventSink<T> extends CanalLifeCycle {

/**

* 提交数据

*

* @param event

* @param remoteAddress

* @param destination

* @throws CanalSinkException

* @throws InterruptedException

*/

boolean sink(T event, InetSocketAddress remoteAddress, String destination) throws CanalSinkException,

InterruptedException;

/**

* 中断消费,比如解析模块发生了切换,想临时中断当前的merge请求,清理对应的上下文状态,可见{@linkplain GroupEventSink}

*/

void interrupt();

}

  • CanalEventSink继承了CanalLifeCycle,它定义了sink、interrupt接口

AbstractCanalEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/AbstractCanalEventSink.java

public abstract class AbstractCanalEventSink<T> extends AbstractCanalLifeCycle implements CanalEventSink<T> {

protected CanalEventFilter filter;

protected List<CanalEventDownStreamHandler> handlers = new ArrayList<CanalEventDownStreamHandler>();

public void setFilter(CanalEventFilter filter) {

this.filter = filter;

}

public void addHandler(CanalEventDownStreamHandler handler) {

this.handlers.add(handler);

}

public CanalEventDownStreamHandler getHandler(int index) {

return this.handlers.get(index);

}

public void addHandler(CanalEventDownStreamHandler handler, int index) {

this.handlers.add(index, handler);

}

public void removeHandler(int index) {

this.handlers.remove(index);

}

public void removeHandler(CanalEventDownStreamHandler handler) {

this.handlers.remove(handler);

}

public CanalEventFilter getFilter() {

return filter;

}

public List<CanalEventDownStreamHandler> getHandlers() {

return handlers;

}

public void interrupt() {

// do nothing

}

}

  • AbstractCanalEventSink继承了AbstractCanalLifeCycle,声明实现了CanalEventSink接口;它定义了filter及handlers两个属性

EntryEventSink

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {

private static final Logger logger = LoggerFactory.getLogger(EntryEventSink.class);

private static final int maxFullTimes = 10;

private CanalEventStore<Event> eventStore;

protected boolean filterTransactionEntry = false; // 是否需要尽可能过滤事务头/尾

protected boolean filterEmtryTransactionEntry = true; // 是否需要过滤空的事务头/尾

protected long emptyTransactionInterval = 5 * 1000; // 空的事务输出的频率

protected long emptyTransctionThresold = 8192; // 超过8192个事务头,输出一个

protected volatile long lastTransactionTimestamp = 0L;

protected AtomicLong lastTransactionCount = new AtomicLong(0L);

protected volatile long lastEmptyTransactionTimestamp = 0L;

protected AtomicLong lastEmptyTransactionCount = new AtomicLong(0L);

protected AtomicLong eventsSinkBlockingTime = new AtomicLong(0L);

protected boolean raw;

public EntryEventSink(){

addHandler(new HeartBeatEntryEventHandler());

}

public void start() {

super.start();

Assert.notNull(eventStore);

if (eventStore instanceof MemoryEventStoreWithBuffer) {

this.raw = ((MemoryEventStoreWithBuffer) eventStore).isRaw();

}

for (CanalEventDownStreamHandler handler : getHandlers()) {

if (!handler.isStart()) {

handler.start();

}

}

}

public void stop() {

super.stop();

for (CanalEventDownStreamHandler handler : getHandlers()) {

if (handler.isStart()) {

handler.stop();

}

}

}

public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination) {

return sinkData(entrys, remoteAddress);

}

//......

}

  • EntryEventSink继承了AbstractCanalEventSink,声明实现了CanalEventSink,其start方法会遍历handlers,挨个执行handler.start方法;其stop方法会遍历handlers,挨个执行handler.stop方法;其sink方法执行的是sinkData方法

sinkData

canal-1.1.4/sink/src/main/java/com/alibaba/otter/canal/sink/entry/EntryEventSink.java

public class EntryEventSink extends AbstractCanalEventSink<List<CanalEntry.Entry>> implements CanalEventSink<List<CanalEntry.Entry>> {

//......

private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress)

throws InterruptedException {

boolean hasRowData = false;

boolean hasHeartBeat = false;

List<Event> events = new ArrayList<Event>();

for (CanalEntry.Entry entry : entrys) {

if (!doFilter(entry)) {

continue;

}

if (filterTransactionEntry

&& (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND)) {

long currentTimestamp = entry.getHeader().getExecuteTime();

// 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常

if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold

&& Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) {

continue;

} else {

lastTransactionCount.set(0L);

lastTransactionTimestamp = currentTimestamp;

}

}

hasRowData |= (entry.getEntryType() == EntryType.ROWDATA);

hasHeartBeat |= (entry.getEntryType() == EntryType.HEARTBEAT);

Event event = new Event(new LogIdentity(remoteAddress, -1L), entry, raw);

events.add(event);

}

if (hasRowData || hasHeartBeat) {

// 存在row记录 或者 存在heartbeat记录,直接跳给后续处理

return doSink(events);

} else {

// 需要过滤的数据

if (filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events)) {

long currentTimestamp = events.get(0).getExecuteTime();

// 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常

if (Math.abs(currentTimestamp - lastEmptyTransactionTimestamp) > emptyTransactionInterval

|| lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {

lastEmptyTransactionCount.set(0L);

lastEmptyTransactionTimestamp = currentTimestamp;

return doSink(events);

}

}

// 直接返回true,忽略空的事务头和尾

return true;

}

}

protected boolean doSink(List<Event> events) {

for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {

events = handler.before(events);

}

long blockingStart = 0L;

int fullTimes = 0;

do {

if (eventStore.tryPut(events)) {

if (fullTimes > 0) {

eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);

}

for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {

events = handler.after(events);

}

return true;

} else {

if (fullTimes == 0) {

blockingStart = System.nanoTime();

}

applyWait(++fullTimes);

if (fullTimes % 100 == 0) {

long nextStart = System.nanoTime();

eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);

blockingStart = nextStart;

}

}

for (CanalEventDownStreamHandler<List<Event>> handler : getHandlers()) {

events = handler.retry(events);

}

} while (running && !Thread.interrupted());

return false;

}

//......

}

  • sinkData方法遍历entrys,通过doFilter过滤掉一些entry,之后将entry转换为Event添加到events中,之后执行doSink方法;doSink方法遍历handlers,挨个执行handler.before(events),之后执行eventStore.tryPut(events),然后遍历handlers,挨个执行handler.after(events);若tryPut不成功,则遍历handlers,挨个执行handler.retry(events)

小结

CanalEventSink继承了CanalLifeCycle,它定义了sink、interrupt接口;AbstractCanalEventSink继承了AbstractCanalLifeCycle,声明实现了CanalEventSink接口;它定义了filter及handlers两个属性;EntryEventSink继承了AbstractCanalEventSink,声明实现了CanalEventSink,其start方法会遍历handlers,挨个执行handler.start方法;其stop方法会遍历handlers,挨个执行handler.stop方法;其sink方法执行的是sinkData方法

doc

  • CanalEventSink

以上是 聊聊CanalEventSink 的全部内容, 来源链接: utcz.com/z/515471.html

回到顶部