聊聊debezium的eventHandlers

编程

handleInsert

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

//......

protected void handleInsert(Event event) throws InterruptedException {

if (skipEvent) {

// We can skip this because we should already be at least this far ...

logger.debug("Skipping previously processed row event: {}", event);

return;

}

if (ignoreDmlEventByGtidSource) {

logger.debug("Skipping DML event because this GTID source is filtered: {}", event);

return;

}

WriteRowsEventData write = unwrapData(event);

long tableNumber = write.getTableId();

BitSet includedColumns = write.getIncludedColumns();

RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);

if (recordMaker != null) {

List<Serializable[]> rows = write.getRows();

final Instant ts = context.getClock().currentTimeAsInstant();

int count = 0;

int numRows = rows.size();

if (startingRowNumber < numRows) {

for (int row = startingRowNumber; row != numRows; ++row) {

count += recordMaker.create(rows.get(row), ts, row, numRows);

}

if (logger.isDebugEnabled()) {

if (startingRowNumber != 0) {

logger.debug("Recorded {} insert record(s) for last {} row(s) in event: {}",

count, numRows - startingRowNumber, event);

}

else {

logger.debug("Recorded {} insert record(s) for event: {}", count, event);

}

}

}

else {

// All rows were previously processed ...

logger.debug("Skipping previously processed insert event: {}", event);

}

}

else {

informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "insert row");

}

startingRowNumber = 0;

}

//......

}

  • handleInsert方法将event解析为WriteRowsEventData,然后通过recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord)获取recordMaker,执行recordMaker.create(rows.get(row), ts, row, numRows)

handleUpdate

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

//......

protected void handleUpdate(Event event) throws InterruptedException {

if (skipEvent) {

// We can skip this because we should already be at least this far ...

logger.debug("Skipping previously processed row event: {}", event);

return;

}

if (ignoreDmlEventByGtidSource) {

logger.debug("Skipping DML event because this GTID source is filtered: {}", event);

return;

}

UpdateRowsEventData update = unwrapData(event);

long tableNumber = update.getTableId();

BitSet includedColumns = update.getIncludedColumns();

// BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate();

RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);

if (recordMaker != null) {

List<Entry<Serializable[], Serializable[]>> rows = update.getRows();

final Instant ts = context.getClock().currentTimeAsInstant();

int count = 0;

int numRows = rows.size();

if (startingRowNumber < numRows) {

for (int row = startingRowNumber; row != numRows; ++row) {

Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);

Serializable[] before = changes.getKey();

Serializable[] after = changes.getValue();

count += recordMaker.update(before, after, ts, row, numRows);

}

if (logger.isDebugEnabled()) {

if (startingRowNumber != 0) {

logger.debug("Recorded {} update record(s) for last {} row(s) in event: {}",

count, numRows - startingRowNumber, event);

}

else {

logger.debug("Recorded {} update record(s) for event: {}", count, event);

}

}

}

else {

// All rows were previously processed ...

logger.debug("Skipping previously processed update event: {}", event);

}

}

else {

informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "update row");

}

startingRowNumber = 0;

}

//......

}

  • handleUpdate方法将event解析为UpdateRowsEventData,然后通过recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord)获取recordMaker,执行recordMaker.update(before, after, ts, row, numRows)

handleDelete

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

//......

protected void handleDelete(Event event) throws InterruptedException {

if (skipEvent) {

// We can skip this because we should already be at least this far ...

logger.debug("Skipping previously processed row event: {}", event);

return;

}

if (ignoreDmlEventByGtidSource) {

logger.debug("Skipping DML event because this GTID source is filtered: {}", event);

return;

}

DeleteRowsEventData deleted = unwrapData(event);

long tableNumber = deleted.getTableId();

BitSet includedColumns = deleted.getIncludedColumns();

RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);

if (recordMaker != null) {

List<Serializable[]> rows = deleted.getRows();

final Instant ts = context.getClock().currentTimeAsInstant();

int count = 0;

int numRows = rows.size();

if (startingRowNumber < numRows) {

for (int row = startingRowNumber; row != numRows; ++row) {

count += recordMaker.delete(rows.get(row), ts, row, numRows);

}

if (logger.isDebugEnabled()) {

if (startingRowNumber != 0) {

logger.debug("Recorded {} delete record(s) for last {} row(s) in event: {}",

count, numRows - startingRowNumber, event);

}

else {

logger.debug("Recorded {} delete record(s) for event: {}", count, event);

}

}

}

else {

// All rows were previously processed ...

logger.debug("Skipping previously processed delete event: {}", event);

}

}

else {

informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "delete row");

}

startingRowNumber = 0;

}

//......

}

  • handleDelete方法将event解析为DeleteRowsEventData,然后通过recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord)获取recordMaker,执行recordMaker.delete(rows.get(row), ts, row, numRows)

handleQueryEvent

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

//......

protected void handleQueryEvent(Event event) throws InterruptedException {

QueryEventData command = unwrapData(event);

logger.debug("Received query command: {}", event);

String sql = command.getSql().trim();

if (sql.equalsIgnoreCase("BEGIN")) {

// We are starting a new transaction ...

source.startNextTransaction();

source.setBinlogThread(command.getThreadId());

if (initialEventsToSkip != 0) {

logger.debug("Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",

initialEventsToSkip, startingRowNumber);

// We are restarting, so we need to skip the events in this transaction that we processed previously...

skipEvent = true;

}

return;

}

if (sql.equalsIgnoreCase("COMMIT")) {

handleTransactionCompletion(event);

return;

}

String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();

if (upperCasedStatementBegin.startsWith("XA ")) {

// This is an XA transaction, and we currently ignore these and do nothing ...

return;

}

if (context.ddlFilter().test(sql)) {

logger.debug("DDL "{}" was filtered out of processing", sql);

return;

}

if (upperCasedStatementBegin.equals("INSERT ") || upperCasedStatementBegin.equals("UPDATE ") || upperCasedStatementBegin.equals("DELETE ")) {

throw new ConnectException(

"Received DML "" + sql + "" for processing, binlog probably contains events generated with statement or mixed based replication format");

}

if (sql.equalsIgnoreCase("ROLLBACK")) {

// We have hit a ROLLBACK which is not supported

logger.warn("Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check "{}" to see how to enable buffering",

MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());

}

context.dbSchema().applyDdl(context.source(), command.getDatabase(), command.getSql(), (dbName, tables, statements) -> {

if (recordSchemaChangesInSourceRecords && recordMakers.schemaChanges(dbName, tables, statements, super::enqueueRecord) > 0) {

logger.debug("Recorded DDL statements for database "{}": {}", dbName, statements);

}

});

}

//......

}

  • handleQueryEvent方法将event解析为QueryEventData,然后通过context.dbSchema().applyDdl来执行ddl

小结

BinlogReader注册了增删改查的eventHandlers,它们分别解析event对对应的data,对于增删改则执行recordMakers的对应方法,对于查询则作用对应的ddl语句

doc

  • BinlogReader

以上是 聊聊debezium的eventHandlers 的全部内容, 来源链接: utcz.com/z/516613.html

回到顶部