聊聊canal的EventTransactionBuffer

编程

本文主要研究一下canal的EventTransactionBuffer

EventTransactionBuffer

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.java

public class EventTransactionBuffer extends AbstractCanalLifeCycle {

private static final long INIT_SQEUENCE = -1;

private int bufferSize = 1024;

private int indexMask;

private CanalEntry.Entry[] entries;

private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置

private AtomicLong flushSequence = new AtomicLong(INIT_SQEUENCE); // 代表满足flush条件后最后一次数据flush的时间

private TransactionFlushCallback flushCallback;

public EventTransactionBuffer(){

}

public EventTransactionBuffer(TransactionFlushCallback flushCallback){

this.flushCallback = flushCallback;

}

public void start() throws CanalStoreException {

super.start();

if (Integer.bitCount(bufferSize) != 1) {

throw new IllegalArgumentException("bufferSize must be a power of 2");

}

Assert.notNull(flushCallback, "flush callback is null!");

indexMask = bufferSize - 1;

entries = new CanalEntry.Entry[bufferSize];

}

public void stop() throws CanalStoreException {

putSequence.set(INIT_SQEUENCE);

flushSequence.set(INIT_SQEUENCE);

entries = null;

super.stop();

}

public void add(List<CanalEntry.Entry> entrys) throws InterruptedException {

for (CanalEntry.Entry entry : entrys) {

add(entry);

}

}

public void add(CanalEntry.Entry entry) throws InterruptedException {

switch (entry.getEntryType()) {

case TRANSACTIONBEGIN:

flush();// 刷新上一次的数据

put(entry);

break;

case TRANSACTIONEND:

put(entry);

flush();

break;

case ROWDATA:

put(entry);

// 针对非DML的数据,直接输出,不进行buffer控制

EventType eventType = entry.getHeader().getEventType();

if (eventType != null && !isDml(eventType)) {

flush();

}

break;

case HEARTBEAT:

// master过来的heartbeat,说明binlog已经读完了,是idle状态

put(entry);

flush();

break;

default:

break;

}

}

public void reset() {

putSequence.set(INIT_SQEUENCE);

flushSequence.set(INIT_SQEUENCE);

}

private void put(CanalEntry.Entry data) throws InterruptedException {

// 首先检查是否有空位

if (checkFreeSlotAt(putSequence.get() + 1)) {

long current = putSequence.get();

long next = current + 1;

// 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值

entries[getIndex(next)] = data;

putSequence.set(next);

} else {

flush();// buffer区满了,刷新一下

put(data);// 继续加一下新数据

}

}

private void flush() throws InterruptedException {

long start = this.flushSequence.get() + 1;

long end = this.putSequence.get();

if (start <= end) {

List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();

for (long next = start; next <= end; next++) {

transaction.add(this.entries[getIndex(next)]);

}

flushCallback.flush(transaction);

flushSequence.set(end);// flush成功后,更新flush位置

}

}

//......

}

  • EventTransactionBuffer继承了AbstractCanalLifeCycle,其start方法创建bufferSize大小的CanalEntry.Entry数组;其stop方法设置putSequence及flushSequence为INIT_SQEUENCE,设置entries为null;其add方法根据entry.getEntryType()的不同类型做不同的处理,基本是执行put及flush方法;其reset方法设置putSequence及flushSequence为INIT_SQEUENCE;put方法给entries复制的同时更新putSequence,如果buffer满了则执行flush在重新put;flush方法则执行flushCallback.flush(transaction),并更新flushSequence

TransactionFlushCallback

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.java

    public static interface TransactionFlushCallback {

public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException;

}

  • TransactionFlushCallback接口定义了flush方法,它接收CanalEntry.Entry类型的List

EntryProtocol.proto

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

syntax = "proto3";

package com.alibaba.otter.canal.protocol;

option java_package = "com.alibaba.otter.canal.protocol";

option java_outer_classname = "CanalEntry";

option optimize_for = SPEED;

/****************************************************************

* message model

*如果要在Enum中新增类型,确保以前的类型的下标值不变.

****************************************************************/

message Entry {

/**协议头部信息**/

Header header = 1;

///**打散后的事件类型**/ [default = ROWDATA]

oneof entryType_present{

EntryType entryType = 2;

}

/**传输的二进制数组**/

bytes storeValue = 3;

}

/**message Header**/

message Header {

/**协议的版本号**/ //[default = 1]

oneof version_present {

int32 version = 1;

}

/**binlog/redolog 文件名**/

string logfileName = 2;

/**binlog/redolog 文件的偏移位置**/

int64 logfileOffset = 3;

/**服务端serverId**/

int64 serverId = 4;

/** 变更数据的编码 **/

string serverenCode = 5;

/**变更数据的执行时间 **/

int64 executeTime = 6;

/** 变更数据的来源**/ //[default = MYSQL]

oneof sourceType_present {

Type sourceType = 7;

}

/** 变更数据的schemaname**/

string schemaName = 8;

/**变更数据的tablename**/

string tableName = 9;

/**每个event的长度**/

int64 eventLength = 10;

/**数据变更类型**/ // [default = UPDATE]

oneof eventType_present {

EventType eventType = 11;

}

/**预留扩展**/

repeated Pair props = 12;

/**当前事务的gitd**/

string gtid = 13;

}

/**每个字段的数据结构**/

message Column {

/**字段下标**/

int32 index = 1;

/**字段java中类型**/

int32 sqlType = 2;

/**字段名称(忽略大小写),在mysql中是没有的**/

string name = 3;

/**是否是主键**/

bool isKey = 4;

/**如果EventType=UPDATE,用于标识这个字段值是否有修改**/

bool updated = 5;

/** 标识是否为空 **/ //[default = false]

oneof isNull_present {

bool isNull = 6;

}

/**预留扩展**/

repeated Pair props = 7;

/** 字段值,timestamp,Datetime是一个时间格式的文本 **/

string value = 8;

/** 对应数据对象原始长度 **/

int32 length = 9;

/**字段mysql类型**/

string mysqlType = 10;

}

message RowData {

/** 字段信息,增量数据(修改前,删除前) **/

repeated Column beforeColumns = 1;

/** 字段信息,增量数据(修改后,新增后) **/

repeated Column afterColumns = 2;

/**预留扩展**/

repeated Pair props = 3;

}

/**message row 每行变更数据的数据结构**/

message RowChange {

/**tableId,由数据库产生**/

int64 tableId = 1;

/**数据变更类型**/ //[default = UPDATE]

oneof eventType_present {

EventType eventType = 2;

}

/** 标识是否是ddl语句 **/ // [default = false]

oneof isDdl_present {

bool isDdl = 10;

}

/** ddl/query的sql语句 **/

string sql = 11;

/** 一次数据库变更可能存在多行 **/

repeated RowData rowDatas = 12;

/**预留扩展**/

repeated Pair props = 13;

/** ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName **/

string ddlSchemaName = 14;

}

/**开始事务的一些信息**/

message TransactionBegin{

/**已废弃,请使用header里的executeTime**/

int64 executeTime = 1;

/**已废弃,Begin里不提供事务id**/

string transactionId = 2;

/**预留扩展**/

repeated Pair props = 3;

/**执行的thread Id**/

int64 threadId = 4;

}

/**结束事务的一些信息**/

message TransactionEnd{

/**已废弃,请使用header里的executeTime**/

int64 executeTime = 1;

/**事务号**/

string transactionId = 2;

/**预留扩展**/

repeated Pair props = 3;

}

/**预留扩展**/

message Pair{

string key = 1;

string value = 2;

}

/**打散后的事件类型,主要用于标识事务的开始,变更数据,结束**/

enum EntryType{

ENTRYTYPECOMPATIBLEPROTO2 = 0;

TRANSACTIONBEGIN = 1;

ROWDATA = 2;

TRANSACTIONEND = 3;

/** 心跳类型,内部使用,外部暂不可见,可忽略 **/

HEARTBEAT = 4;

GTIDLOG = 5;

}

/** 事件类型 **/

enum EventType {

EVENTTYPECOMPATIBLEPROTO2 = 0;

INSERT = 1;

UPDATE = 2;

DELETE = 3;

CREATE = 4;

ALTER = 5;

ERASE = 6;

QUERY = 7;

TRUNCATE = 8;

RENAME = 9;

/**CREATE INDEX**/

CINDEX = 10;

DINDEX = 11;

GTID = 12;

/** XA **/

XACOMMIT = 13;

XAROLLBACK = 14;

/** MASTER HEARTBEAT **/

MHEARTBEAT = 15;

}

/**数据库类型**/

enum Type {

TYPECOMPATIBLEPROTO2 = 0;

ORACLE = 1;

MYSQL = 2;

PGSQL = 3;

}

  • EntryProtocol.proto定义了CanalEntry.Entry,它包含header及entryType

AbstractEventParser.transactionBuffer

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> {

//......

public AbstractEventParser(){

// 初始化一下

transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {

public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {

boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);

if (!running) {

return;

}

if (!successed) {

throw new CanalParseException("consume failed!");

}

LogPosition position = buildLastTransactionPosition(transaction);

if (position != null) { // 可能position为空

logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);

}

}

});

}

protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,

InterruptedException {

long startTs = -1;

boolean enabled = getProfilingEnabled();

if (enabled) {

startTs = System.currentTimeMillis();

}

boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);

if (enabled) {

this.processingInterval = System.currentTimeMillis() - startTs;

}

if (consumedEventCount.incrementAndGet() < 0) {

consumedEventCount.set(0);

}

return result;

}

//......

}

  • AbstractEventParser的构造器使用匿名TransactionFlushCallback创建了EventTransactionBuffer;该TransactionFlushCallback会执行consumeTheEventAndProfilingIfNecessary,如果不成功则抛出CanalParseException,成功则构建position并执行ogPositionManager.persistLogPosition;consumeTheEventAndProfilingIfNecessary方法则执行eventSink.sink

小结

EventTransactionBuffer继承了AbstractCanalLifeCycle,其start方法创建bufferSize大小的CanalEntry.Entry数组;其stop方法设置putSequence及flushSequence为INIT_SQEUENCE,设置entries为null;其add方法根据entry.getEntryType()的不同类型做不同的处理,基本是执行put及flush方法;其reset方法设置putSequence及flushSequence为INIT_SQEUENCE;put方法给entries复制的同时更新putSequence,如果buffer满了则执行flush在重新put;flush方法则执行flushCallback.flush(transaction),并更新flushSequence

doc

  • EventTransactionBuffer

以上是 聊聊canal的EventTransactionBuffer 的全部内容, 来源链接: utcz.com/z/515846.html

回到顶部