聊聊debezium的BlockingReader
Reader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java
public interface Reader { public static enum State {
/**
* The reader is stopped and static.
*/
STOPPED,
/**
* The reader is running and generated records.
*/
RUNNING,
/**
* The reader has completed its work or been explicitly stopped, but not all of the generated records have been
* consumed via {@link Reader#poll() polling}.
*/
STOPPING;
}
public String name();
public State state();
public void uponCompletion(Runnable handler);
public default void initialize() {
// do nothing
}
public default void destroy() {
// do nothing
}
public void start();
public void stop();
public List<SourceRecord> poll() throws InterruptedException;
}
- Reader接口定义了name、state、uponCompletion、start、stop、poll方法
BlockingReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java
public class BlockingReader implements Reader { protected final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final AtomicReference<State> state = new AtomicReference<>();
private final Metronome metronome;
private final String name;
private final String runningLogMessage;
public BlockingReader(String name, String runningLogMessage) {
this.name = name;
this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
this.runningLogMessage = runningLogMessage;
}
/**
* Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested.
*/
@Override
public List<SourceRecord> poll() throws InterruptedException {
if (state.get() == State.STOPPED) {
return null;
}
metronome.pause();
state.compareAndSet(State.RUNNING, State.STOPPING);
return null;
}
@Override
public State state() {
return state.get();
}
@Override
public void uponCompletion(Runnable handler) {
assert this.uponCompletion.get() == null;
this.uponCompletion.set(handler);
}
@Override
public void start() {
state.set(State.RUNNING);
logger.info(runningLogMessage);
}
@Override
public void stop() {
try {
state.set(State.STOPPED);
// Cleanup Resources
Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
if (completionHandler != null) {
completionHandler.run();
}
}
finally {
logger.info("Blocking Reader has completed.");
}
}
@Override
public String name() {
return name;
}
}
- BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null
TimedBlockingReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TimedBlockingReader.java
public class TimedBlockingReader extends BlockingReader { protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Duration timeout;
private volatile Timer timer;
/**
* @param name Name of the reader
* @param timeout Duration of time until this TimedBlockingReader should stop
*/
public TimedBlockingReader(String name, Duration timeout) {
super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding");
this.timeout = timeout;
}
@Override
public void start() {
super.start();
this.timer = Threads.timer(Clock.SYSTEM, timeout);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
super.poll();
// Stop when we"ve reached the timeout threshold
if (timer != null && timer.expired()) {
stop();
}
return null;
}
}
- TimedBlockingReader继承了BlockingReader,其start方法通过Threads.timer(Clock.SYSTEM, timeout)创建了Timer;其poll方法先执行父类的poll方法,然后在timer.expired()为true时执行stop(),最后返回null
小结
BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null
doc
- BlockingReader
以上是 聊聊debezium的BlockingReader 的全部内容, 来源链接: utcz.com/z/516459.html