聊聊debezium的BlockingReader

编程

Reader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java

public interface Reader {

public static enum State {

/**

* The reader is stopped and static.

*/

STOPPED,

/**

* The reader is running and generated records.

*/

RUNNING,

/**

* The reader has completed its work or been explicitly stopped, but not all of the generated records have been

* consumed via {@link Reader#poll() polling}.

*/

STOPPING;

}

public String name();

public State state();

public void uponCompletion(Runnable handler);

public default void initialize() {

// do nothing

}

public default void destroy() {

// do nothing

}

public void start();

public void stop();

public List<SourceRecord> poll() throws InterruptedException;

}

  • Reader接口定义了name、state、uponCompletion、start、stop、poll方法

BlockingReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java

public class BlockingReader implements Reader {

protected final Logger logger = LoggerFactory.getLogger(getClass());

private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();

private final AtomicReference<State> state = new AtomicReference<>();

private final Metronome metronome;

private final String name;

private final String runningLogMessage;

public BlockingReader(String name, String runningLogMessage) {

this.name = name;

this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);

this.runningLogMessage = runningLogMessage;

}

/**

* Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested.

*/

@Override

public List<SourceRecord> poll() throws InterruptedException {

if (state.get() == State.STOPPED) {

return null;

}

metronome.pause();

state.compareAndSet(State.RUNNING, State.STOPPING);

return null;

}

@Override

public State state() {

return state.get();

}

@Override

public void uponCompletion(Runnable handler) {

assert this.uponCompletion.get() == null;

this.uponCompletion.set(handler);

}

@Override

public void start() {

state.set(State.RUNNING);

logger.info(runningLogMessage);

}

@Override

public void stop() {

try {

state.set(State.STOPPED);

// Cleanup Resources

Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once

if (completionHandler != null) {

completionHandler.run();

}

}

finally {

logger.info("Blocking Reader has completed.");

}

}

@Override

public String name() {

return name;

}

}

  • BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null

TimedBlockingReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TimedBlockingReader.java

public class TimedBlockingReader extends BlockingReader {

protected final Logger logger = LoggerFactory.getLogger(getClass());

private final Duration timeout;

private volatile Timer timer;

/**

* @param name Name of the reader

* @param timeout Duration of time until this TimedBlockingReader should stop

*/

public TimedBlockingReader(String name, Duration timeout) {

super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding");

this.timeout = timeout;

}

@Override

public void start() {

super.start();

this.timer = Threads.timer(Clock.SYSTEM, timeout);

}

@Override

public List<SourceRecord> poll() throws InterruptedException {

super.poll();

// Stop when we"ve reached the timeout threshold

if (timer != null && timer.expired()) {

stop();

}

return null;

}

}

  • TimedBlockingReader继承了BlockingReader,其start方法通过Threads.timer(Clock.SYSTEM, timeout)创建了Timer;其poll方法先执行父类的poll方法,然后在timer.expired()为true时执行stop(),最后返回null

小结

BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null

doc

  • BlockingReader

以上是 聊聊debezium的BlockingReader 的全部内容, 来源链接: utcz.com/z/516459.html

回到顶部