聊聊debezium的OffsetCommitPolicy

编程

OffsetCommitPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java

@Incubating

@FunctionalInterface

public interface OffsetCommitPolicy {

boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit);

static OffsetCommitPolicy always() {

return new AlwaysCommitOffsetPolicy();

}

static OffsetCommitPolicy periodic(Properties config) {

return new PeriodicCommitOffsetPolicy(config);

}

}

  • OffsetCommitPolicy定义了performCommit方法,并提供了always静态方法用于创建AlwaysCommitOffsetPolicy;提供了periodic静态方法用于创建PeriodicCommitOffsetPolicy

AlwaysCommitOffsetPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java

    public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {

@Override

public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {

return true;

}

}

  • AlwaysCommitOffsetPolicy实现了OffsetCommitPolicy接口,其performCommit返回true

PeriodicCommitOffsetPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java

    public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {

private final Duration minimumTime;

public PeriodicCommitOffsetPolicy(Properties config) {

minimumTime = Duration.ofMillis(Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP)));

}

@Override

public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {

return timeSinceLastCommit.compareTo(minimumTime) >= 0;

}

}

  • PeriodicCommitOffsetPolicy实现了OffsetCommitPolicy接口,其performCommit通过timeSinceLastCommit.compareTo(minimumTime)进行判断,大于等于0返回true

RecordCommitter

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java

    public static interface RecordCommitter<R> {

/**

* Marks a single record as processed, must be called for each

* record.

*

* @param record the record to commit

*/

void markProcessed(R record) throws InterruptedException;

/**

* Marks a batch as finished, this may result in committing offsets/flushing

* data.

* <p>

* Should be called when a batch of records is finished being processed.

*/

void markBatchFinished();

}

  • RecordCommitter接口定义了markProcessed、markBatchFinished方法

EmbeddedEngine

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java

@ThreadSafe

public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {

//......

protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter, SourceTask task, Duration commitTimeout) {

return new RecordCommitter() {

@Override

public synchronized void markProcessed(SourceRecord record) throws InterruptedException {

task.commitRecord(record);

recordsSinceLastCommit += 1;

offsetWriter.offset(record.sourcePartition(), record.sourceOffset());

}

@Override

public synchronized void markBatchFinished() {

maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeout, task);

}

};

}

protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout,

SourceTask task) {

// Determine if we need to commit to offset storage ...

long timeSinceLastCommitMillis = clock.currentTimeInMillis() - timeOfLastCommitMillis;

if (policy.performCommit(recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))) {

commitOffsets(offsetWriter, commitTimeout, task);

}

}

protected void commitOffsets(OffsetStorageWriter offsetWriter, Duration commitTimeout, SourceTask task) {

long started = clock.currentTimeInMillis();

long timeout = started + commitTimeout.toMillis();

if (!offsetWriter.beginFlush()) {

return;

}

Future<Void> flush = offsetWriter.doFlush(this::completedFlush);

if (flush == null) {

return; // no offsets to commit ...

}

// Wait until the offsets are flushed ...

try {

flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS);

// if we"ve gotten this far, the offsets have been committed so notify the task

task.commit();

recordsSinceLastCommit = 0;

timeOfLastCommitMillis = clock.currentTimeInMillis();

}

catch (InterruptedException e) {

logger.warn("Flush of {} offsets interrupted, cancelling", this);

offsetWriter.cancelFlush();

}

catch (ExecutionException e) {

logger.error("Flush of {} offsets threw an unexpected exception: ", this, e);

offsetWriter.cancelFlush();

}

catch (TimeoutException e) {

logger.error("Timed out waiting to flush {} offsets to storage", this);

offsetWriter.cancelFlush();

}

}

//......

}

  • EmbeddedEngine的buildRecordCommitter方法创建了一个匿名RecordCommitter实现,其markBatchFinished方法会执行maybeFlush方法,该方法会通过policy.performCommit方法来判断是否执行commitOffsets;commitOffsets方法主要执行offsetWriter.doFlush

小结

OffsetCommitPolicy定义了performCommit方法,并提供了always静态方法用于创建AlwaysCommitOffsetPolicy;提供了periodic静态方法用于创建PeriodicCommitOffsetPolicy

doc

  • OffsetCommitPolicy

以上是 聊聊debezium的OffsetCommitPolicy 的全部内容, 来源链接: utcz.com/z/516662.html

回到顶部