聊聊debezium的Heartbeat

编程

Heartbeat

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java

public interface Heartbeat {

public static final String HEARTBEAT_INTERVAL_PROPERTY_NAME = "heartbeat.interval.ms";

@FunctionalInterface

public static interface OffsetProducer {

Map<String, ?> offset();

}

void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;

void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;

void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;

boolean isEnabled();

//......

}

  • Heartbeat定义了OffsetProducer接口,该接口定义了offset方法;它还定义了heartbeat、forcedBeat、isEnabled方法

HeartbeatImpl

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.java

class HeartbeatImpl implements Heartbeat {

private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);

private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);

/**

* Default length of interval in which connector generates periodically

* heartbeat messages. A size of 0 disables heartbeat.

*/

static final int DEFAULT_HEARTBEAT_INTERVAL = 0;

/**

* Default prefix for names of heartbeat topics

*/

static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";

private static final String SERVER_NAME_KEY = "serverName";

private static Schema KEY_SCHEMA = SchemaBuilder.struct()

.name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey"))

.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)

.build();

private static Schema VALUE_SCHEMA = SchemaBuilder.struct()

.name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat"))

.field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)

.build();

private final String topicName;

private final Duration heartbeatInterval;

private final String key;

private volatile Timer heartbeatTimeout;

HeartbeatImpl(Configuration configuration, String topicName, String key) {

this.topicName = topicName;

this.key = key;

heartbeatInterval = configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);

heartbeatTimeout = resetHeartbeat();

}

@Override

public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {

if (heartbeatTimeout.expired()) {

forcedBeat(partition, offset, consumer);

heartbeatTimeout = resetHeartbeat();

}

}

@Override

public void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {

if (heartbeatTimeout.expired()) {

forcedBeat(partition, offsetProducer.offset(), consumer);

heartbeatTimeout = resetHeartbeat();

}

}

@Override

public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer)

throws InterruptedException {

LOGGER.debug("Generating heartbeat event");

if (offset == null || offset.isEmpty()) {

// Do not send heartbeat message if no offset is available yet

return;

}

consumer.accept(heartbeatRecord(partition, offset));

}

@Override

public boolean isEnabled() {

return true;

}

private SourceRecord heartbeatRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {

final Integer partition = 0;

return new SourceRecord(sourcePartition, sourceOffset,

topicName, partition, KEY_SCHEMA, serverNameKey(key), VALUE_SCHEMA, messageValue());

}

private Timer resetHeartbeat() {

return Threads.timer(Clock.SYSTEM, heartbeatInterval);

}

//......

}

  • HeartbeatImpl实现了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()时执行forcedBeat,然后执行resetHeartbeat;其forcedBeat方法执行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法创建SourceRecord并返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval)

DatabaseHeartbeatImpl

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/DatabaseHeartbeatImpl.java

public class DatabaseHeartbeatImpl extends HeartbeatImpl {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHeartbeatImpl.class);

public static final String HEARTBEAT_ACTION_QUERY_PROPERTY_NAME = "heartbeat.action.query";

public static final Field HEARTBEAT_ACTION_QUERY = Field.create(HEARTBEAT_ACTION_QUERY_PROPERTY_NAME)

.withDisplayName("The query to execute with every heartbeat")

.withType(ConfigDef.Type.STRING)

.withWidth(ConfigDef.Width.MEDIUM)

.withImportance(ConfigDef.Importance.LOW)

.withDescription("The query executed with every heartbeat. Defaults to an empty string.");

private final String heartBeatActionQuery;

private final JdbcConnection jdbcConnection;

DatabaseHeartbeatImpl(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery) {

super(configuration, topicName, key);

this.heartBeatActionQuery = heartBeatActionQuery;

this.jdbcConnection = jdbcConnection;

}

@Override

public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {

try {

jdbcConnection.execute(heartBeatActionQuery);

}

catch (Exception e) {

LOGGER.error("Could not execute heartbeat action", e);

}

LOGGER.debug("Executed heartbeat action query");

super.forcedBeat(partition, offset, consumer);

}

}

  • DatabaseHeartbeatImpl继承了HeartbeatImpl,其forcedBeat方法执行jdbcConnection.execute(heartBeatActionQuery),然后再执行super.forcedBeat(partition, offset, consumer)

Heartbeat.create

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java

public interface Heartbeat {

//......

public static Heartbeat create(Configuration configuration, String topicName, String key) {

return configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero() ? NULL : new HeartbeatImpl(configuration, topicName, key);

}

public static Heartbeat create(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection) {

if (configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero()) {

return NULL;

}

String heartBeatActionQuery = configuration.getString(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY);

if (heartBeatActionQuery != null) {

return new DatabaseHeartbeatImpl(configuration, topicName, key, jdbcConnection, heartBeatActionQuery);

}

return new HeartbeatImpl(configuration, topicName, key);

}

//......

}

  • Heartbeat提供了两个create静态方法,一个用于创建HeartbeatImpl,另外一个在heartBeatActionQuery不为hull时创建DatabaseHeartbeatImpl

小结

HeartbeatImpl实现了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()时执行forcedBeat,然后执行resetHeartbeat;其forcedBeat方法执行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法创建SourceRecord并返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval);DatabaseHeartbeatImpl继承了HeartbeatImpl,其forcedBeat方法执行jdbcConnection.execute(heartBeatActionQuery),然后再执行super.forcedBeat(partition, offset, consumer)

doc

  • Heartbeat

以上是 聊聊debezium的Heartbeat 的全部内容, 来源链接: utcz.com/z/516431.html

回到顶部