聊聊debezium的SnapshotChangeRecordEmitter

编程

SnapshotChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/SnapshotChangeRecordEmitter.java

public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter {

private final Object[] row;

public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) {

super(offset, clock);

this.row = row;

}

@Override

protected Operation getOperation() {

return Operation.READ;

}

@Override

protected Object[] getOldColumnValues() {

throw new UnsupportedOperationException("Can"t get old row values for READ record");

}

@Override

protected Object[] getNewColumnValues() {

return row;

}

}

  • SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row

RelationalChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java

public abstract class RelationalChangeRecordEmitter extends AbstractChangeRecordEmitter<TableSchema> {

protected final Logger logger = LoggerFactory.getLogger(getClass());

public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {

super(offsetContext, clock);

}

@Override

public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {

TableSchema tableSchema = (TableSchema) schema;

Operation operation = getOperation();

switch (operation) {

case CREATE:

emitCreateRecord(receiver, tableSchema);

break;

case READ:

emitReadRecord(receiver, tableSchema);

break;

case UPDATE:

emitUpdateRecord(receiver, tableSchema);

break;

case DELETE:

emitDeleteRecord(receiver, tableSchema);

break;

default:

throw new IllegalArgumentException("Unsupported operation: " + operation);

}

}

@Override

protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)

throws InterruptedException {

Object[] newColumnValues = getNewColumnValues();

Object newKey = tableSchema.keyFromColumnData(newColumnValues);

Struct newValue = tableSchema.valueFromColumnData(newColumnValues);

Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {

logger.warn("no new values found for table "{}" from create message at "{}"; skipping record", tableSchema, getOffset().getSourceInfo());

return;

}

receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());

}

@Override

protected void emitReadRecord(Receiver receiver, TableSchema tableSchema)

throws InterruptedException {

Object[] newColumnValues = getNewColumnValues();

Object newKey = tableSchema.keyFromColumnData(newColumnValues);

Struct newValue = tableSchema.valueFromColumnData(newColumnValues);

Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset());

}

@Override

protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)

throws InterruptedException {

Object[] oldColumnValues = getOldColumnValues();

Object[] newColumnValues = getNewColumnValues();

Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);

Object newKey = tableSchema.keyFromColumnData(newColumnValues);

Struct newValue = tableSchema.valueFromColumnData(newColumnValues);

Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {

logger.warn("no new values found for table "{}" from update message at "{}"; skipping record", tableSchema, getOffset().getSourceInfo());

return;

}

// some configurations does not provide old values in case of updates

// in this case we handle all updates as regular ones

if (oldKey == null || Objects.equals(oldKey, newKey)) {

Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

receiver.changeRecord(tableSchema, Operation.UPDATE, newKey, envelope, getOffset());

}

// PK update -> emit as delete and re-insert with new key

else {

Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());

envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());

}

}

@Override

protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException {

Object[] oldColumnValues = getOldColumnValues();

Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);

Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {

logger.warn("no old values found for table "{}" from delete message at "{}"; skipping record", tableSchema, getOffset().getSourceInfo());

return;

}

Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());

}

/**

* Returns the operation done by the represented change.

*/

protected abstract Operation getOperation();

/**

* Returns the old row state in case of an UPDATE or DELETE.

*/

protected abstract Object[] getOldColumnValues();

/**

* Returns the new row state in case of a CREATE or READ.

*/

protected abstract Object[] getNewColumnValues();

/**

* Whether empty data messages should be ignored.

*

* @return true if empty data messages coming from data source should be ignored.</br>

* Typical use case are PostgreSQL changes without FULL replica identity.

*/

protected boolean skipEmptyMessages() {

return false;

}

}

  • RelationalChangeRecordEmitter继承了AbstractChangeRecordEmitter,其泛型为TableSchema;其emitChangeRecords方法根据不同的operation执行不同的emit方法;这些emit方法主要是构造key及envelope,然后执行receiver.changeRecord

AbstractChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/AbstractChangeRecordEmitter.java

public abstract class AbstractChangeRecordEmitter<T extends DataCollectionSchema> implements ChangeRecordEmitter {

private final OffsetContext offsetContext;

private final Clock clock;

public AbstractChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {

this.offsetContext = offsetContext;

this.clock = clock;

}

@Override

@SuppressWarnings({ "unchecked" })

public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {

Operation operation = getOperation();

switch (operation) {

case CREATE:

emitCreateRecord(receiver, (T) schema);

break;

case READ:

emitReadRecord(receiver, (T) schema);

break;

case UPDATE:

emitUpdateRecord(receiver, (T) schema);

break;

case DELETE:

emitDeleteRecord(receiver, (T) schema);

break;

default:

throw new IllegalArgumentException("Unsupported operation: " + operation);

}

}

@Override

public OffsetContext getOffset() {

return offsetContext;

}

/**

* Returns the clock of the change record(s) emitted.

*/

public Clock getClock() {

return clock;

}

/**

* Returns the operation associated with the change.

*/

protected abstract Operation getOperation();

/**

* Emits change record(s) associated with a snapshot.

*

* @param receiver the handler for which the emitted record should be dispatched

* @param schema the schema

*/

protected abstract void emitReadRecord(Receiver receiver, T schema) throws InterruptedException;

/**

* Emits change record(s) associated with an insert operation.

*

* @param receiver the handler for which the emitted record should be dispatched

* @param schema the schema

*/

protected abstract void emitCreateRecord(Receiver receiver, T schema) throws InterruptedException;

/**

* Emits change record(s) associated with an update operation.

*

* @param receiver the handler for which the emitted record should be dispatched

* @param schema the schema

*/

protected abstract void emitUpdateRecord(Receiver receiver, T schema) throws InterruptedException;

/**

* Emits change record(s) associated with a delete operation.

*

* @param receiver the handler for which the emitted record should be dispatched

* @param schema the schema

*/

protected abstract void emitDeleteRecord(Receiver receiver, T schema) throws InterruptedException;

}

  • AbstractChangeRecordEmitter实现了ChangeRecordEmitter接口,其提供了emitChangeRecords方法,封装了针对不同operation的调用,同事定义了emitCreateRecord、emitReadRecord、emitUpdateRecord、emitDeleteRecord方法供子类实现

ChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java

public interface ChangeRecordEmitter {

/**

* Emits the change record(s) corresponding to data change represented by this emitter.

*/

void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException;

/**

* Returns the offset of the change record(s) emitted.

*/

OffsetContext getOffset();

public interface Receiver {

void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException;

}

}

  • ChangeRecordEmitter接口定义了emitChangeRecords、getOffset方法,同时还定义了Receiver接口,该接口定义了changeRecord方法

小结

SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row

doc

  • SnapshotChangeRecordEmitter

以上是 聊聊debezium的SnapshotChangeRecordEmitter 的全部内容, 来源链接: utcz.com/z/516415.html

回到顶部