聊聊DebeziumEngine

编程

DebeziumEngine

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java

@Incubating

public interface DebeziumEngine<R> extends Runnable, Closeable {

//......

public static <T> Builder<T> create(Class<? extends ChangeEventFormat<T>> eventFormat) {

final ServiceLoader<Builder> loader = ServiceLoader.load(Builder.class);

final Iterator<Builder> iterator = loader.iterator();

if (!iterator.hasNext()) {

throw new DebeziumException("No implementation of Debezium engine builder was found");

}

final Builder builder = iterator.next();

if (iterator.hasNext()) {

LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass());

}

return builder;

}

//......

}

  • DebeziumEngine提供了create方法,它接收eventFormat参数,然后使用ServiceLoader.load(Builder.class)加载Builder.class,然后返回第一个builder

DebeziumEngine.Builder

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java

    public static interface Builder<R> {

Builder<R> notifying(Consumer<R> consumer);

Builder<R> notifying(ChangeConsumer<R> handler);

Builder<R> using(Properties config);

Builder<R> using(ClassLoader classLoader);

Builder<R> using(Clock clock);

Builder<R> using(CompletionCallback completionCallback);

Builder<R> using(ConnectorCallback connectorCallback);

Builder<R> using(OffsetCommitPolicy policy);

DebeziumEngine<R> build();

}

  • Builder接口定义了notifying、using、build方法

EmbeddedEngine.BuilderImpl

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java

    public static final class BuilderImpl implements Builder {

private Configuration config;

private DebeziumEngine.ChangeConsumer<SourceRecord> handler;

private ClassLoader classLoader;

private Clock clock;

private DebeziumEngine.CompletionCallback completionCallback;

private DebeziumEngine.ConnectorCallback connectorCallback;

private OffsetCommitPolicy offsetCommitPolicy = null;

@Override

public Builder using(Configuration config) {

this.config = config;

return this;

}

@Override

public Builder using(Properties config) {

this.config = Configuration.from(config);

return this;

}

@Override

public Builder using(ClassLoader classLoader) {

this.classLoader = classLoader;

return this;

}

@Override

public Builder using(Clock clock) {

this.clock = clock;

return this;

}

@Override

public Builder using(DebeziumEngine.CompletionCallback completionCallback) {

this.completionCallback = completionCallback;

return this;

}

@Override

public Builder using(DebeziumEngine.ConnectorCallback connectorCallback) {

this.connectorCallback = connectorCallback;

return this;

}

@Override

public Builder using(OffsetCommitPolicy offsetCommitPolicy) {

this.offsetCommitPolicy = offsetCommitPolicy;

return this;

}

@Override

public Builder notifying(Consumer<SourceRecord> consumer) {

this.handler = buildDefaultChangeConsumer(consumer);

return this;

}

@Override

public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler) {

this.handler = handler;

return this;

}

@Override

public Builder using(java.time.Clock clock) {

return using(new Clock() {

@Override

public long currentTimeInMillis() {

return clock.millis();

}

});

}

@Override

public EmbeddedEngine build() {

if (classLoader == null) {

classLoader = getClass().getClassLoader();

}

if (clock == null) {

clock = Clock.system();

}

Objects.requireNonNull(config, "A connector configuration must be specified.");

Objects.requireNonNull(handler, "A connector consumer or changeHandler must be specified.");

return new EmbeddedEngine(config, classLoader, clock,

handler, completionCallback, connectorCallback, offsetCommitPolicy);

}

// backward compatibility methods

@Override

public Builder using(CompletionCallback completionCallback) {

return using((DebeziumEngine.CompletionCallback) completionCallback);

}

@Override

public Builder using(ConnectorCallback connectorCallback) {

return using((DebeziumEngine.ConnectorCallback) connectorCallback);

}

}

  • BuilderImpl实现了Builder接口,其build方法创建的是EmbeddedEngine

EmbeddedEngine

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java

@ThreadSafe

public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {

private final Logger logger = LoggerFactory.getLogger(getClass());

private final Configuration config;

private final Clock clock;

private final ClassLoader classLoader;

private final DebeziumEngine.ChangeConsumer<SourceRecord> handler;

private final DebeziumEngine.CompletionCallback completionCallback;

private final DebeziumEngine.ConnectorCallback connectorCallback;

private final AtomicReference<Thread> runningThread = new AtomicReference<>();

private final VariableLatch latch = new VariableLatch(0);

private final Converter keyConverter;

private final Converter valueConverter;

private final WorkerConfig workerConfig;

private final CompletionResult completionResult;

private long recordsSinceLastCommit = 0;

private long timeOfLastCommitMillis = 0;

private OffsetCommitPolicy offsetCommitPolicy;

private SourceTask task;

private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> handler,

DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback,

OffsetCommitPolicy offsetCommitPolicy) {

this.config = config;

this.handler = handler;

this.classLoader = classLoader;

this.clock = clock;

this.completionCallback = completionCallback != null ? completionCallback : (success, msg, error) -> {

if (!success) {

logger.error(msg, error);

}

};

this.connectorCallback = connectorCallback;

this.completionResult = new CompletionResult();

this.offsetCommitPolicy = offsetCommitPolicy;

assert this.config != null;

assert this.handler != null;

assert this.classLoader != null;

assert this.clock != null;

keyConverter = config.getInstance(INTERNAL_KEY_CONVERTER_CLASS, Converter.class, () -> this.classLoader);

keyConverter.configure(config.subset(INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);

valueConverter = config.getInstance(INTERNAL_VALUE_CONVERTER_CLASS, Converter.class, () -> this.classLoader);

Configuration valueConverterConfig = config;

if (valueConverter instanceof JsonConverter) {

// Make sure that the JSON converter is configured to NOT enable schemas ...

valueConverterConfig = config.edit().with(INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false).build();

}

valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);

// Create the worker config, adding extra fields that are required for validation of a worker config

// but that are not used within the embedded engine (since the source records are never serialized) ...

Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);

embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());

embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());

workerConfig = new EmbeddedConfig(embeddedConfig);

}

public void run() {

if (runningThread.compareAndSet(null, Thread.currentThread())) {

final String engineName = config.getString(ENGINE_NAME);

final String connectorClassName = config.getString(CONNECTOR_CLASS);

final Optional<DebeziumEngine.ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);

// Only one thread can be in this part of the method at a time ...

latch.countUp();

try {

if (!config.validateAndRecord(CONNECTOR_FIELDS, logger::error)) {

fail("Failed to start connector with invalid configuration (see logs for actual errors)");

return;

}

// Instantiate the connector ...

SourceConnector connector = null;

try {

@SuppressWarnings("unchecked")

Class<? extends SourceConnector> connectorClass = (Class<SourceConnector>) classLoader.loadClass(connectorClassName);

connector = connectorClass.getDeclaredConstructor().newInstance();

}

catch (Throwable t) {

fail("Unable to instantiate connector class "" + connectorClassName + """, t);

return;

}

// Instantiate the offset store ...

final String offsetStoreClassName = config.getString(OFFSET_STORAGE);

OffsetBackingStore offsetStore = null;

try {

@SuppressWarnings("unchecked")

Class<? extends OffsetBackingStore> offsetStoreClass = (Class<OffsetBackingStore>) classLoader.loadClass(offsetStoreClassName);

offsetStore = offsetStoreClass.getDeclaredConstructor().newInstance();

}

catch (Throwable t) {

fail("Unable to instantiate OffsetBackingStore class "" + offsetStoreClassName + """, t);

return;

}

// Initialize the offset store ...

try {

offsetStore.configure(workerConfig);

offsetStore.start();

}

catch (Throwable t) {

fail("Unable to configure and start the "" + offsetStoreClassName + "" offset backing store", t);

return;

}

// Set up the offset commit policy ...

if (offsetCommitPolicy == null) {

offsetCommitPolicy = config.getInstance(EmbeddedEngine.OFFSET_COMMIT_POLICY, OffsetCommitPolicy.class, config);

}

// Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ...

ConnectorContext context = new ConnectorContext() {

@Override

public void requestTaskReconfiguration() {

// Do nothing ...

}

@Override

public void raiseError(Exception e) {

fail(e.getMessage(), e);

}

};

connector.initialize(context);

OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName,

keyConverter, valueConverter);

OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName,

keyConverter, valueConverter);

Duration commitTimeout = Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS));

try {

// Start the connector with the given properties and get the task configurations ...

connector.start(config.asMap());

connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);

List<Map<String, String>> taskConfigs = connector.taskConfigs(1);

Class<? extends Task> taskClass = connector.taskClass();

task = null;

try {

task = (SourceTask) taskClass.getDeclaredConstructor().newInstance();

}

catch (IllegalAccessException | InstantiationException t) {

fail("Unable to instantiate connector"s task class "" + taskClass.getName() + """, t);

return;

}

try {

SourceTaskContext taskContext = new SourceTaskContext() {

@Override

public OffsetStorageReader offsetStorageReader() {

return offsetReader;

}

public Map<String, String> configs() {

// TODO Auto-generated method stub

return null;

}

};

task.initialize(taskContext);

task.start(taskConfigs.get(0));

connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);

}

catch (Throwable t) {

// Mask the passwords ...

Configuration config = Configuration.from(taskConfigs.get(0)).withMaskedPasswords();

String msg = "Unable to initialize and start connector"s task class "" + taskClass.getName() + "" with config: "

+ config;

fail(msg, t);

return;

}

recordsSinceLastCommit = 0;

Throwable handlerError = null;

try {

timeOfLastCommitMillis = clock.currentTimeInMillis();

RecordCommitter committer = buildRecordCommitter(offsetWriter, task, commitTimeout);

while (runningThread.get() != null) {

List<SourceRecord> changeRecords = null;

try {

logger.debug("Embedded engine is polling task for records on thread {}", runningThread.get());

changeRecords = task.poll(); // blocks until there are values ...

logger.debug("Embedded engine returned from polling task for records");

}

catch (InterruptedException e) {

// Interrupted while polling ...

logger.debug("Embedded engine interrupted on thread {} while polling the task for records", runningThread.get());

if (this.runningThread.get() == Thread.currentThread()) {

// this thread is still set as the running thread -> we were not interrupted

// due the stop() call -> probably someone else called the interrupt on us ->

// -> we should raise the interrupt flag

Thread.currentThread().interrupt();

}

break;

}

catch (RetriableException e) {

logger.info("Retrieable exception thrown, connector will be restarted", e);

// Retriable exception should be ignored by the engine

// and no change records delivered.

// The retry is handled in io.debezium.connector.common.BaseSourceTask.poll()

}

try {

if (changeRecords != null && !changeRecords.isEmpty()) {

logger.debug("Received {} records from the task", changeRecords.size());

try {

handler.handleBatch(changeRecords, committer);

}

catch (StopConnectorException e) {

break;

}

}

else {

logger.debug("Received no records from the task");

}

}

catch (Throwable t) {

// There was some sort of unexpected exception, so we should stop work

handlerError = t;

break;

}

}

}

finally {

if (handlerError != null) {

// There was an error in the handler so make sure it"s always captured...

fail("Stopping connector after error in the application"s handler method: " + handlerError.getMessage(),

handlerError);

}

try {

// First stop the task ...

logger.debug("Stopping the task and engine");

task.stop();

connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStopped);

// Always commit offsets that were captured from the source records we actually processed ...

commitOffsets(offsetWriter, commitTimeout, task);

if (handlerError == null) {

// We stopped normally ...

succeed("Connector "" + connectorClassName + "" completed normally.");

}

}

catch (Throwable t) {

fail("Error while trying to stop the task and commit the offsets", t);

}

}

}

catch (Throwable t) {

fail("Error while trying to run connector class "" + connectorClassName + """, t);

}

finally {

// Close the offset storage and finally the connector ...

try {

offsetStore.stop();

}

catch (Throwable t) {

fail("Error while trying to stop the offset store", t);

}

finally {

try {

connector.stop();

connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStopped);

}

catch (Throwable t) {

fail("Error while trying to stop connector class "" + connectorClassName + """, t);

}

}

}

}

finally {

latch.countDown();

runningThread.set(null);

// after we"ve "shut down" the engine, fire the completion callback based on the results we collected

completionCallback.handle(completionResult.success(), completionResult.message(), completionResult.error());

}

}

}

public boolean stop() {

logger.debug("Stopping the embedded engine");

// Signal that the run() method should stop ...

Thread thread = this.runningThread.getAndSet(null);

if (thread != null) {

try {

latch.await(

Long.valueOf(

System.getProperty(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP, Long.toString(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT.toMillis()))),

TimeUnit.MILLISECONDS);

}

catch (InterruptedException e) {

}

logger.debug("Interrupting the embedded engine"s thread {} (already interrupted: {})", thread, thread.isInterrupted());

// Interrupt the thread in case it is blocked while polling the task for records ...

thread.interrupt();

return true;

}

return false;

}

@Override

public void close() throws IOException {

stop();

}

}

  • EmbeddedEngine实现了DebeziumEngine接口,其run方法主要是实例化SourceConnector,OffsetBackingStore,执行offsetStore.configure(workerConfig)及offsetStore.start()以及connector.initialize(context)、connector.start(config.asMap()),然后创创建connector task,执行task.initialize(taskContext)及task.start(taskConfigs.get(0));之后通过while循环执行task.poll()获取changeRecords,然后通过handler.handleBatch(changeRecords, committer)进行回调,最后在finally的时候执行task.stop()以及commitOffsets

小结

DebeziumEngine提供了create方法,它接收eventFormat参数,然后使用ServiceLoader.load(Builder.class)加载Builder.class,然后返回第一个builder;BuilderImpl实现了Builder接口,其build方法创建的是EmbeddedEngine

doc

  • DebeziumEngine

以上是 聊聊DebeziumEngine 的全部内容, 来源链接: utcz.com/z/516453.html

回到顶部