聊聊debezium的ChangeEventQueue

编程

ChangeEventQueueMetrics

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java

public interface ChangeEventQueueMetrics {

int totalCapacity();

int remainingCapacity();

}

  • ChangeEventQueueMetrics接口定义了totalCapacity、remainingCapacity方法

ChangeEventQueue

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java

public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {

private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class);

private final Duration pollInterval;

private final int maxBatchSize;

private final int maxQueueSize;

private final BlockingQueue<T> queue;

private final Metronome metronome;

private final Supplier<PreviousContext> loggingContextSupplier;

private volatile RuntimeException producerException;

private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {

this.pollInterval = pollInterval;

this.maxBatchSize = maxBatchSize;

this.maxQueueSize = maxQueueSize;

this.queue = new LinkedBlockingDeque<>(maxQueueSize);

this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);

this.loggingContextSupplier = loggingContextSupplier;

}

public static class Builder<T> {

private Duration pollInterval;

private int maxQueueSize;

private int maxBatchSize;

private Supplier<LoggingContext.PreviousContext> loggingContextSupplier;

public Builder<T> pollInterval(Duration pollInterval) {

this.pollInterval = pollInterval;

return this;

}

public Builder<T> maxQueueSize(int maxQueueSize) {

this.maxQueueSize = maxQueueSize;

return this;

}

public Builder<T> maxBatchSize(int maxBatchSize) {

this.maxBatchSize = maxBatchSize;

return this;

}

public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {

this.loggingContextSupplier = loggingContextSupplier;

return this;

}

public ChangeEventQueue<T> build() {

return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier);

}

}

/**

* Enqueues a record so that it can be obtained via {@link #poll()}. This method

* will block if the queue is full.

*

* @param record

* the record to be enqueued

* @throws InterruptedException

* if this thread has been interrupted

*/

public void enqueue(T record) throws InterruptedException {

if (record == null) {

return;

}

// The calling thread has been interrupted, let"s abort

if (Thread.interrupted()) {

throw new InterruptedException();

}

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Enqueuing source record "{}"", record);

}

// this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue

queue.put(record);

}

/**

* Returns the next batch of elements from this queue. May be empty in case no

* elements have arrived in the maximum waiting time.

*

* @throws InterruptedException

* if this thread has been interrupted while waiting for more

* elements to arrive

*/

public List<T> poll() throws InterruptedException {

LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();

try {

LOGGER.debug("polling records...");

List<T> records = new ArrayList<>();

final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));

while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) {

throwProducerExceptionIfPresent();

LOGGER.debug("no records available yet, sleeping a bit...");

// no records yet, so wait a bit

metronome.pause();

LOGGER.debug("checking for more records...");

}

return records;

}

finally {

previousContext.restore();

}

}

public void producerException(final RuntimeException producerException) {

this.producerException = producerException;

}

private void throwProducerExceptionIfPresent() {

if (producerException != null) {

throw producerException;

}

}

@Override

public int totalCapacity() {

return maxQueueSize;

}

@Override

public int remainingCapacity() {

return queue.remainingCapacity();

}

}

  • ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()

Threads

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Threads.java

public class Threads {

//......

public static interface TimeSince {

/**

* Reset the elapsed time to 0.

*/

void reset();

/**

* Get the time that has elapsed since the last call to {@link #reset() reset}.

*

* @return the number of milliseconds

*/

long elapsedTime();

}

public static interface Timer {

/**

* @return true if current time is greater than start time plus requested time period

*/

boolean expired();

Duration remaining();

}

public static Timer timer(Clock clock, Duration time) {

final TimeSince start = timeSince(clock);

start.reset();

return new Timer() {

@Override

public boolean expired() {

return start.elapsedTime() > time.toMillis();

}

@Override

public Duration remaining() {

return time.minus(start.elapsedTime(), ChronoUnit.MILLIS);

}

};

}

public static TimeSince timeSince(Clock clock) {

return new TimeSince() {

private long lastTimeInMillis;

@Override

public void reset() {

lastTimeInMillis = clock.currentTimeInMillis();

}

@Override

public long elapsedTime() {

long elapsed = clock.currentTimeInMillis() - lastTimeInMillis;

return elapsed <= 0L ? 0L : elapsed;

}

};

}

//......

}

  • Threads定义了Timer接口,该接口定义了expired、remaining方法;timer方法先通过timeSince创建TimeSince,然后创建一个匿名Timer

LoggingContext

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/LoggingContext.java

public class LoggingContext {

/**

* The key for the connector type MDC property.

*/

public static final String CONNECTOR_TYPE = "dbz.connectorType";

/**

* The key for the connector logical name MDC property.

*/

public static final String CONNECTOR_NAME = "dbz.connectorName";

/**

* The key for the connector context name MDC property.

*/

public static final String CONNECTOR_CONTEXT = "dbz.connectorContext";

private LoggingContext() {

}

/**

* A snapshot of an MDC context that can be {@link #restore()}.

*/

public static final class PreviousContext {

private static final Map<String, String> EMPTY_CONTEXT = Collections.emptyMap();

private final Map<String, String> context;

protected PreviousContext() {

Map<String, String> context = MDC.getCopyOfContextMap();

this.context = context != null ? context : EMPTY_CONTEXT;

}

/**

* Restore this logging context.

*/

public void restore() {

MDC.setContextMap(context);

}

}

//......

}

  • LoggingContext定义了PreviousContext,其构造器使用MDC.getCopyOfContextMap()拷贝的当前的MDC,其restore方法把之前拷贝的MDC数据再次设置到MDC中

Metronome

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Metronome.java

@FunctionalInterface

public interface Metronome {

public void pause() throws InterruptedException;

public static Metronome sleeper(Duration period, Clock timeSystem) {

long periodInMillis = period.toMillis();

return new Metronome() {

private long next = timeSystem.currentTimeInMillis() + periodInMillis;

@Override

public void pause() throws InterruptedException {

for (;;) {

final long now = timeSystem.currentTimeInMillis();

if (next <= now) {

break;

}

Thread.sleep(next - now);

}

next = next + periodInMillis;

}

@Override

public String toString() {

return "Metronome (sleep for " + periodInMillis + " ms)";

}

};

}

//......

}

  • Metronome接口定义了pause方法;它提供了sleeper静态方法用于创建匿名的Metronome实现类,该实现类的pause方法通过Thread.sleep来实现pause

小结

ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()

doc

  • ChangeEventQueue

以上是 聊聊debezium的ChangeEventQueue 的全部内容, 来源链接: utcz.com/z/516390.html

回到顶部