聊聊nifi的AbstractBinlogTableEventWriter

编程

本文主要研究一下nifi的AbstractBinlogTableEventWriter

AbstractBinlogTableEventWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java

public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {

protected void writeJson(T event) throws IOException {

super.writeJson(event);

if (event.getDatabaseName() != null) {

jsonGenerator.writeStringField("database", event.getDatabaseName());

} else {

jsonGenerator.writeNullField("database");

}

if (event.getTableName() != null) {

jsonGenerator.writeStringField("table_name", event.getTableName());

} else {

jsonGenerator.writeNullField("table_name");

}

if (event.getTableId() != null) {

jsonGenerator.writeNumberField("table_id", event.getTableId());

} else {

jsonGenerator.writeNullField("table_id");

}

}

// Default implementation for table-related binlog events

@Override

public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {

FlowFile flowFile = session.create();

flowFile = session.write(flowFile, (outputStream) -> {

super.startJson(outputStream, eventInfo);

writeJson(eventInfo);

// Nothing in the body

super.endJson();

});

flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));

session.transfer(flowFile, relationship);

session.getProvenanceReporter().receive(flowFile, transitUri);

return currentSequenceId + 1;

}

}

  • AbstractBinlogTableEventWriter继承了AbstractBinlogEventWriter,其泛型基类为BinlogTableEventInfo,它有四个子类,分别是DDLEventWriter、InsertRowsWriter、UpdateRowsWriter、DeleteRowsWriter

DDLEventWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java

public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> {

@Override

public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {

FlowFile flowFile = session.create();

flowFile = session.write(flowFile, (outputStream) -> {

super.startJson(outputStream, eventInfo);

super.writeJson(eventInfo);

jsonGenerator.writeStringField("query", eventInfo.getQuery());

super.endJson();

});

flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));

session.transfer(flowFile, relationship);

session.getProvenanceReporter().receive(flowFile, transitUri);

return currentSequenceId + 1;

}

}

  • DDLEventWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入DDLEventInfo

InsertRowsWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java

public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsEventInfo> {

/**

* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set

*

* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred

* @param eventInfo An event whose value will become the contents of the flow file

* @return The next available CDC sequence ID for use by the CDC processor

*/

@Override

public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {

final AtomicLong seqId = new AtomicLong(currentSequenceId);

for (Serializable[] row : eventInfo.getRows()) {

FlowFile flowFile = session.create();

flowFile = session.write(flowFile, outputStream -> {

super.startJson(outputStream, eventInfo);

super.writeJson(eventInfo);

final BitSet bitSet = eventInfo.getIncludedColumns();

writeRow(eventInfo, row, bitSet);

super.endJson();

});

flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));

session.transfer(flowFile, relationship);

session.getProvenanceReporter().receive(flowFile, transitUri);

seqId.getAndIncrement();

}

return seqId.get();

}

protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {

jsonGenerator.writeArrayFieldStart("columns");

int i = includedColumns.nextSetBit(0);

while (i != -1) {

jsonGenerator.writeStartObject();

jsonGenerator.writeNumberField("id", i + 1);

ColumnDefinition columnDefinition = event.getColumnByIndex(i);

Integer columnType = null;

if (columnDefinition != null) {

jsonGenerator.writeStringField("name", columnDefinition.getName());

columnType = columnDefinition.getType();

jsonGenerator.writeNumberField("column_type", columnType);

}

if (row[i] == null) {

jsonGenerator.writeNullField("value");

} else {

jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));

}

jsonGenerator.writeEndObject();

i = includedColumns.nextSetBit(i + 1);

}

jsonGenerator.writeEndArray();

}

}

  • InsertRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入InsertRowsEventInfo

UpdateRowsWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java

public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsEventInfo> {

/**

* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set

*

* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred

* @param eventInfo An event whose value will become the contents of the flow file

* @return The next available CDC sequence ID for use by the CDC processor

*/

@Override

public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {

final AtomicLong seqId = new AtomicLong(currentSequenceId);

for (Map.Entry<Serializable[], Serializable[]> row : eventInfo.getRows()) {

FlowFile flowFile = session.create();

flowFile = session.write(flowFile, outputStream -> {

super.startJson(outputStream, eventInfo);

super.writeJson(eventInfo);

final BitSet bitSet = eventInfo.getIncludedColumns();

writeRow(eventInfo, row, bitSet);

super.endJson();

});

flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));

session.transfer(flowFile, relationship);

session.getProvenanceReporter().receive(flowFile, transitUri);

seqId.getAndIncrement();

}

return seqId.get();

}

protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws IOException {

jsonGenerator.writeArrayFieldStart("columns");

int i = includedColumns.nextSetBit(0);

while (i != -1) {

jsonGenerator.writeStartObject();

jsonGenerator.writeNumberField("id", i + 1);

ColumnDefinition columnDefinition = event.getColumnByIndex(i);

Integer columnType = null;

if (columnDefinition != null) {

jsonGenerator.writeStringField("name", columnDefinition.getName());

columnType = columnDefinition.getType();

jsonGenerator.writeNumberField("column_type", columnType);

}

Serializable[] oldRow = row.getKey();

Serializable[] newRow = row.getValue();

if (oldRow[i] == null) {

jsonGenerator.writeNullField("last_value");

} else {

jsonGenerator.writeObjectField("last_value", MySQLCDCUtils.getWritableObject(columnType, oldRow[i]));

}

if (newRow[i] == null) {

jsonGenerator.writeNullField("value");

} else {

jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, newRow[i]));

}

jsonGenerator.writeEndObject();

i = includedColumns.nextSetBit(i + 1);

}

jsonGenerator.writeEndArray();

}

}

  • UpdateRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入UpdateRowsEventInfo

DeleteRowsWriter

nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java

public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsEventInfo> {

/**

* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set

*

* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred

* @param eventInfo An event whose value will become the contents of the flow file

* @return The next available CDC sequence ID for use by the CDC processor

*/

@Override

public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {

final AtomicLong seqId = new AtomicLong(currentSequenceId);

for (Serializable[] row : eventInfo.getRows()) {

FlowFile flowFile = session.create();

flowFile = session.write(flowFile, outputStream -> {

super.startJson(outputStream, eventInfo);

super.writeJson(eventInfo);

final BitSet bitSet = eventInfo.getIncludedColumns();

writeRow(eventInfo, row, bitSet);

super.endJson();

});

flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));

session.transfer(flowFile, relationship);

session.getProvenanceReporter().receive(flowFile, transitUri);

seqId.getAndIncrement();

}

return seqId.get();

}

protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {

jsonGenerator.writeArrayFieldStart("columns");

int i = includedColumns.nextSetBit(0);

while (i != -1) {

jsonGenerator.writeStartObject();

jsonGenerator.writeNumberField("id", i + 1);

ColumnDefinition columnDefinition = event.getColumnByIndex(i);

Integer columnType = null;

if (columnDefinition != null) {

jsonGenerator.writeStringField("name", columnDefinition.getName());

columnType = columnDefinition.getType();

jsonGenerator.writeNumberField("column_type", columnType);

}

if (row[i] == null) {

jsonGenerator.writeNullField("value");

} else {

jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));

}

jsonGenerator.writeEndObject();

i = includedColumns.nextSetBit(i + 1);

}

jsonGenerator.writeEndArray();

}

}

  • DeleteRowsWriter继承了AbstractBinlogTableEventWriter,其writeEvent方法写入DeleteRowsEventInfo

小结

AbstractBinlogTableEventWriter继承了AbstractBinlogEventWriter,其泛型基类为BinlogTableEventInfo,它有四个子类,分别是DDLEventWriter、InsertRowsWriter、UpdateRowsWriter、DeleteRowsWriter

doc

  • AbstractBinlogTableEventWriter

以上是 聊聊nifi的AbstractBinlogTableEventWriter 的全部内容, 来源链接: utcz.com/z/516895.html

回到顶部