聊聊debezium的RecordMakers

编程

RecordMakers

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

public class RecordMakers {

private final Logger logger = LoggerFactory.getLogger(getClass());

private final MySqlSchema schema;

private final SourceInfo source;

private final TopicSelector<TableId> topicSelector;

private final boolean emitTombstoneOnDelete;

private final Map<Long, Converter> convertersByTableNumber = new HashMap<>();

private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>();

private final Map<Long, TableId> tableIdsByTableNumber = new HashMap<>();

private final Schema schemaChangeKeySchema;

private final Schema schemaChangeValueSchema;

private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(logger);

private final Map<String, ?> restartOffset;

//......

public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector<TableId> topicSelector,

boolean emitTombstoneOnDelete, Map<String, ?> restartOffset) {

this.schema = schema;

this.source = source;

this.topicSelector = topicSelector;

this.emitTombstoneOnDelete = emitTombstoneOnDelete;

this.restartOffset = restartOffset;

this.schemaChangeKeySchema = SchemaBuilder.struct()

.name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.SchemaChangeKey"))

.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)

.build();

this.schemaChangeValueSchema = SchemaBuilder.struct()

.name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.SchemaChangeValue"))

.field(Fields.SOURCE, source.schema())

.field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)

.field(Fields.DDL_STATEMENTS, Schema.STRING_SCHEMA)

.build();

}

public RecordsForTable forTable(TableId tableId, BitSet includedColumns, BlockingConsumer<SourceRecord> consumer) {

Long tableNumber = tableNumbersByTableId.get(tableId);

return tableNumber != null ? forTable(tableNumber, includedColumns, consumer) : null;

}

//......

}

  • RecordMakers提供了forTable方法,用于创建RecordsForTable

RecordsForTable

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

    public final class RecordsForTable {

private final BitSet includedColumns;

private final Converter converter;

private final BlockingConsumer<SourceRecord> consumer;

protected RecordsForTable(Converter converter, BitSet includedColumns, BlockingConsumer<SourceRecord> consumer) {

this.converter = converter;

this.includedColumns = includedColumns;

this.consumer = consumer;

}

public int read(Object[] row, Instant ts) throws InterruptedException {

return read(row, ts, 0, 1);

}

public int read(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException {

return converter.read(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer);

}

public int create(Object[] row, Instant ts) throws InterruptedException {

return create(row, ts, 0, 1);

}

public int create(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException {

return converter.insert(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer);

}

public int update(Object[] before, Object[] after, Instant ts) throws InterruptedException {

return update(before, after, ts, 0, 1);

}

public int update(Object[] before, Object[] after, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException {

return converter.update(source, before, after, rowNumber, numberOfRows, includedColumns, ts, consumer);

}

public int delete(Object[] row, Instant ts) throws InterruptedException {

return delete(row, ts, 0, 1);

}

public int delete(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException {

return converter.delete(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer);

}

}

  • RecordsForTable提供了read、create、update、delete方法,它们都委托给converter的对应方法

Converter

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

    protected static interface Converter {

int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,

BlockingConsumer<SourceRecord> consumer)

throws InterruptedException;

int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,

BlockingConsumer<SourceRecord> consumer)

throws InterruptedException;

int update(SourceInfo source, Object[] before, Object[] after, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,

BlockingConsumer<SourceRecord> consumer)

throws InterruptedException;

int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,

BlockingConsumer<SourceRecord> consumer)

throws InterruptedException;

}

  • Converter接口定义了read、insert、update、delete方法

RecordMakers.Converter

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

        Converter converter = new Converter() {

@Override

public int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,

BlockingConsumer<SourceRecord> consumer)

throws InterruptedException {

Object key = tableSchema.keyFromColumnData(row);

Struct value = tableSchema.valueFromColumnData(row);

if (value != null || key != null) {

Schema keySchema = tableSchema.keySchema();

Map<String, ?> partition = source.partition();

Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);

source.tableEvent(id);

Struct origin = source.struct();

SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,

keySchema, key, envelope.schema(), envelope.read(value, origin, ts));

consumer.accept(record);

return 1;

}

return 0;

}

@Override

public int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,

BlockingConsumer<SourceRecord> consumer)

throws InterruptedException {

validateColumnCount(tableSchema, row);

Object key = tableSchema.keyFromColumnData(row);

Struct value = tableSchema.valueFromColumnData(row);

if (value != null || key != null) {

Schema keySchema = tableSchema.keySchema();

Map<String, ?> partition = source.partition();

Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);

source.tableEvent(id);

Struct origin = source.struct();

SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,

keySchema, key, envelope.schema(), envelope.create(value, origin, ts));

consumer.accept(record);

return 1;

}

return 0;

}

@Override

public int update(SourceInfo source, Object[] before, Object[] after, int rowNumber, int numberOfRows, BitSet includedColumns,

Instant ts,

BlockingConsumer<SourceRecord> consumer)

throws InterruptedException {

int count = 0;

validateColumnCount(tableSchema, after);

Object key = tableSchema.keyFromColumnData(after);

Struct valueAfter = tableSchema.valueFromColumnData(after);

if (valueAfter != null || key != null) {

Object oldKey = tableSchema.keyFromColumnData(before);

Struct valueBefore = tableSchema.valueFromColumnData(before);

Schema keySchema = tableSchema.keySchema();

Map<String, ?> partition = source.partition();

Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);

source.tableEvent(id);

Struct origin = source.struct();

if (key != null && !Objects.equals(key, oldKey)) {

// The key has changed, so we need to deal with both the new key and old key.

// Consumers may push the events into a system that won"t allow both records to exist at the same time,

// so we first want to send the delete event for the old key...

SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,

keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts));

consumer.accept(record);

++count;

if (emitTombstoneOnDelete) {

// Next send a tombstone event for the old key ...

record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, oldKey, null, null);

consumer.accept(record);

++count;

}

// And finally send the create event ...

record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,

keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts));

consumer.accept(record);

++count;

}

else {

// The key has not changed, so a simple update is fine ...

SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,

keySchema, key, envelope.schema(), envelope.update(valueBefore, valueAfter, origin, ts));

consumer.accept(record);

++count;

}

}

return count;

}

@Override

public int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,

BlockingConsumer<SourceRecord> consumer)

throws InterruptedException {

int count = 0;

validateColumnCount(tableSchema, row);

Object key = tableSchema.keyFromColumnData(row);

Struct value = tableSchema.valueFromColumnData(row);

if (value != null || key != null) {

Schema keySchema = tableSchema.keySchema();

Map<String, ?> partition = source.partition();

Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);

source.tableEvent(id);

Struct origin = source.struct();

// Send a delete message ...

SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,

keySchema, key, envelope.schema(), envelope.delete(value, origin, ts));

consumer.accept(record);

++count;

// And send a tombstone ...

if (emitTombstoneOnDelete) {

record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,

keySchema, key, null, null);

consumer.accept(record);

++count;

}

}

return count;

}

@Override

public String toString() {

return "RecordMaker.Converter(" + id + ")";

}

private void validateColumnCount(TableSchema tableSchema, Object[] row) {

final int expectedColumnsCount = schema.tableFor(tableSchema.id()).columns().size();

if (expectedColumnsCount != row.length) {

logger.error("Invalid number of columns, expected "{}" arrived "{}"", expectedColumnsCount, row.length);

throw new ConnectException(

"The binlog event does not contain expected number of columns; the internal schema representation is probably out of sync with the real database schema, or the binlog contains events recorded with binlog_row_image other than FULL or the table in question is an NDB table");

}

}

};

  • RecordMakers创建了一个匿名Converter实现类,其实现方法基本是创建kafka connect的SourceRecord,然后执行consumer.accept(record)

小结

RecordMakers提供了forTable方法,用于创建RecordsForTable;RecordsForTable提供了read、create、update、delete方法,它们都委托给converter的对应方法

doc

  • RecordMakers

以上是 聊聊debezium的RecordMakers 的全部内容, 来源链接: utcz.com/z/516629.html

回到顶部