聊聊debezium的BinlogReader

编程

Reader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java

public interface Reader {

public static enum State {

/**

* The reader is stopped and static.

*/

STOPPED,

/**

* The reader is running and generated records.

*/

RUNNING,

/**

* The reader has completed its work or been explicitly stopped, but not all of the generated records have been

* consumed via {@link Reader#poll() polling}.

*/

STOPPING;

}

public String name();

public State state();

public void uponCompletion(Runnable handler);

public default void initialize() {

// do nothing

}

public default void destroy() {

// do nothing

}

public void start();

public void stop();

public List<SourceRecord> poll() throws InterruptedException;

}

  • Reader接口定义了name、state、uponCompletion、start、stop、poll方法

AbstractReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java

public abstract class AbstractReader implements Reader {

protected final Logger logger = LoggerFactory.getLogger(getClass());

private final String name;

protected final MySqlTaskContext context;

protected final MySqlJdbcContext connectionContext;

private final BlockingQueue<SourceRecord> records;

private final AtomicBoolean running = new AtomicBoolean(false);

private final AtomicBoolean success = new AtomicBoolean(false);

private final AtomicReference<ConnectException> failure = new AtomicReference<>();

private ConnectException failureException;

private final int maxBatchSize;

private final Metronome metronome;

private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();

private final Duration pollInterval;

protected final ChangeEventQueueMetrics changeEventQueueMetrics;

private final HaltingPredicate acceptAndContinue;

public AbstractReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) {

this.name = name;

this.context = context;

this.connectionContext = context.getConnectionContext();

this.records = new LinkedBlockingDeque<>(context.getConnectorConfig().getMaxQueueSize());

this.maxBatchSize = context.getConnectorConfig().getMaxBatchSize();

this.pollInterval = context.getConnectorConfig().getPollInterval();

this.metronome = Metronome.parker(pollInterval, Clock.SYSTEM);

this.acceptAndContinue = acceptAndContinue == null ? new AcceptAllPredicate() : acceptAndContinue;

this.changeEventQueueMetrics = new ChangeEventQueueMetrics() {

@Override

public int totalCapacity() {

return context.getConnectorConfig().getMaxQueueSize();

}

@Override

public int remainingCapacity() {

return records.remainingCapacity();

}

};

}

@Override

public String name() {

return name;

}

@Override

public void uponCompletion(Runnable handler) {

assert this.uponCompletion.get() == null;

this.uponCompletion.set(handler);

}

@Override

public final void initialize() {

doInitialize();

}

@Override

public final void destroy() {

doDestroy();

}

@Override

public void start() {

if (this.running.compareAndSet(false, true)) {

this.failure.set(null);

this.success.set(false);

doStart();

}

}

@Override

public void stop() {

try {

// Emptying the queue so to make sure that enqueue() won"t block indefinitely when adding records after

// poll() isn"t called anymore but before the binlog reader is stopped; note there"s still a tiny chance for

// this to happen if enough records are added again between here and the call to disconnect(); protecting

// against it seems not worth though it as shouldn"t happen for any practical queue size

List<SourceRecord> unsent = new ArrayList<>();

records.drainTo(unsent);

logger.info("Discarding {} unsent record(s) due to the connector shutting down", unsent.size());

doStop();

running.set(false);

}

finally {

if (failure.get() != null) {

// We had a failure and it was propagated via poll(), after which Kafka Connect will stop

// the connector, which will stop the task that will then stop this reader via this method.

// Since no more records will ever be polled again, we know we can clean up this reader"s resources...

doCleanup();

}

}

}

@Override

public State state() {

if (success.get() || failure.get() != null) {

// We"ve either completed successfully or have failed, but either way no more records will be returned ...

return State.STOPPED;

}

if (running.get()) {

return State.RUNNING;

}

// Otherwise, we"re in the process of stopping ...

return State.STOPPING;

}

@Override

public List<SourceRecord> poll() throws InterruptedException {

// Before we do anything else, determine if there was a failure and throw that exception ...

failureException = this.failure.get();

if (failureException != null) {

// In this case, we"ll throw the exception and the Kafka Connect worker or EmbeddedEngine

// will then explicitly stop the connector task. Most likely, however, the reader that threw

// the exception will have already stopped itself and will generate no additional records.

// Regardless, there may be records on the queue that will never be consumed.

throw failureException;

}

// this reader has been stopped before it reached the success or failed end state, so clean up and abort

if (!running.get()) {

cleanupResources();

throw new InterruptedException("Reader was stopped while polling");

}

logger.trace("Polling for next batch of records");

List<SourceRecord> batch = new ArrayList<>(maxBatchSize);

final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));

while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {

// No records are available even though the snapshot has not yet completed, so sleep for a bit ...

metronome.pause();

// Check for failure after waking up ...

failureException = this.failure.get();

if (failureException != null) {

throw failureException;

}

if (timeout.expired()) {

break;

}

}

if (batch.isEmpty() && success.get() && records.isEmpty()) {

// We found no records but the operation completed successfully, so we"re done

this.running.set(false);

cleanupResources();

return null;

}

pollComplete(batch);

logger.trace("Completed batch of {} records", batch.size());

return batch;

}

@Override

public String toString() {

return name;

}

//......

}

  • AbstractReader声明实现了Reader接口,其poll方法主要执行records.drainTo(batch, maxBatchSize)及pollComplete

BinlogReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5);

private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1);

private final boolean recordSchemaChangesInSourceRecords;

private final RecordMakers recordMakers;

private final SourceInfo source;

private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class);

private final BinaryLogClient client;

//......

public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue, long serverId) {

super(name, context, acceptAndContinue);

connectionContext = context.getConnectionContext();

source = context.source();

recordMakers = context.makeRecord();

recordSchemaChangesInSourceRecords = context.includeSchemaChangeRecords();

clock = context.getClock();

eventDeserializationFailureHandlingMode = connectionContext.eventProcessingFailureHandlingMode();

inconsistentSchemaHandlingMode = connectionContext.inconsistentSchemaHandlingMode();

// Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour...

pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);

// Set up the log reader ...

client = new BinaryLogClient(connectionContext.hostname(), connectionContext.port(), connectionContext.username(), connectionContext.password());

// BinaryLogClient will overwrite thread names later

client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false));

client.setServerId(serverId);

client.setSSLMode(sslModeFor(connectionContext.sslMode()));

if (connectionContext.sslModeEnabled()) {

SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectionContext);

if (sslSocketFactory != null) {

client.setSslSocketFactory(sslSocketFactory);

}

}

client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE));

final long keepAliveInterval = context.config().getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS);

client.setKeepAliveInterval(keepAliveInterval);

// Considering heartbeatInterval should be less than keepAliveInterval, we use the heartbeatIntervalFactor

// multiply by keepAliveInterval and set the result value to heartbeatInterval.The default value of heartbeatIntervalFactor

// is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the packet received from the MySQL server.

client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor));

client.registerEventListener(context.bufferSizeForBinlogReader() == 0

? this::handleEvent

: (new EventBuffer(context.bufferSizeForBinlogReader(), this))::add);

client.registerLifecycleListener(new ReaderThreadLifecycleListener());

if (logger.isDebugEnabled()) {

client.registerEventListener(this::logEvent);

}

//......

client.setEventDeserializer(eventDeserializer);

// Set up for JMX ...

metrics = new BinlogReaderMetrics(client, context, name, changeEventQueueMetrics);

heartbeat = Heartbeat.create(context.config(), context.topicSelector().getHeartbeatTopic(),

context.getConnectorConfig().getLogicalName());

}

@Override

protected void doStart() {

context.dbSchema().assureNonEmptySchema();

// Register our event handlers ...

eventHandlers.put(EventType.STOP, this::handleServerStop);

eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);

eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);

eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);

eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);

eventHandlers.put(EventType.QUERY, this::handleQueryEvent);

eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);

eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);

eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);

eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);

eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);

eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);

eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);

eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);

eventHandlers.put(EventType.XID, this::handleTransactionCompletion);

// Conditionally register ROWS_QUERY handler to parse SQL statements.

if (context.includeSqlQuery()) {

eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);

}

final boolean isGtidModeEnabled = connectionContext.isGtidModeEnabled();

metrics.setIsGtidModeEnabled(isGtidModeEnabled);

// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint.

String availableServerGtidStr = connectionContext.knownGtidSet();

if (isGtidModeEnabled) {

// The server is using GTIDs, so enable the handler ...

eventHandlers.put(EventType.GTID, this::handleGtidEvent);

// Now look at the GTID set from the server and what we"ve previously seen ...

GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);

// also take into account purged GTID logs

GtidSet purgedServerGtidSet = connectionContext.purgedGtidSet();

logger.info("GTID set purged on server: {}", purgedServerGtidSet);

GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet, purgedServerGtidSet);

if (filteredGtidSet != null) {

// We"ve seen at least some GTIDs, so start reading from the filtered GTID set ...

logger.info("Registering binlog reader with GTID set: {}", filteredGtidSet);

String filteredGtidSetStr = filteredGtidSet.toString();

client.setGtidSet(filteredGtidSetStr);

source.setCompletedGtidSet(filteredGtidSetStr);

gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);

}

else {

// We"ve not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning ...

client.setBinlogFilename(source.binlogFilename());

client.setBinlogPosition(source.binlogPosition());

gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");

}

}

else {

// The server is not using GTIDs, so start reading the binlog based upon where we last left off ...

client.setBinlogFilename(source.binlogFilename());

client.setBinlogPosition(source.binlogPosition());

}

// We may be restarting in the middle of a transaction, so see how far into the transaction we have already processed...

initialEventsToSkip = source.eventsToSkipUponRestart();

// Set the starting row number, which is the next row number to be read ...

startingRowNumber = source.rowsToSkipUponRestart();

// Only when we reach the first BEGIN event will we start to skip events ...

skipEvent = false;

// Initial our poll output delay logic ...

pollOutputDelay.hasElapsed();

previousOutputMillis = clock.currentTimeInMillis();

// Start the log reader, which starts background threads ...

if (isRunning()) {

long timeout = context.getConnectorConfig().getConnectionTimeout().toMillis();

long started = context.getClock().currentTimeInMillis();

try {

logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);

client.connect(timeout);

}

catch (TimeoutException e) {

// If the client thread is interrupted *before* the client could connect, the client throws a timeout exception

// The only way we can distinguish this is if we get the timeout exception before the specified timeout has

// elapsed, so we simply check this (within 10%) ...

long duration = context.getClock().currentTimeInMillis() - started;

if (duration > (0.9 * timeout)) {

double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);

throw new ConnectException("Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +

connectionContext.hostname() + ":" + connectionContext.port() + " with user "" + connectionContext.username() + """, e);

}

// Otherwise, we were told to shutdown, so we don"t care about the timeout exception

}

catch (AuthenticationException e) {

throw new ConnectException("Failed to authenticate to the MySQL database at " +

connectionContext.hostname() + ":" + connectionContext.port() + " with user "" + connectionContext.username() + """, e);

}

catch (Throwable e) {

throw new ConnectException("Unable to connect to the MySQL database at " +

connectionContext.hostname() + ":" + connectionContext.port() + " with user "" + connectionContext.username() + "": " + e.getMessage(), e);

}

}

}

@Override

protected void doStop() {

try {

if (client.isConnected()) {

logger.debug("Stopping binlog reader "{}", last recorded offset: {}", this.name(), lastOffset);

client.disconnect();

}

cleanupResources();

}

catch (IOException e) {

logger.error("Unexpected error when disconnecting from the MySQL binary log reader "{}"", this.name(), e);

}

}

@Override

protected void pollComplete(List<SourceRecord> batch) {

// Record a bit about this batch ...

int batchSize = batch.size();

recordCounter += batchSize;

totalRecordCounter.addAndGet(batchSize);

if (batchSize > 0) {

SourceRecord lastRecord = batch.get(batchSize - 1);

lastOffset = lastRecord.sourceOffset();

if (pollOutputDelay.hasElapsed()) {

// We want to record the status ...

long millisSinceLastOutput = clock.currentTimeInMillis() - previousOutputMillis;

try {

if (logger.isInfoEnabled()) {

context.temporaryLoggingContext("binlog", () -> {

logger.info("{} records sent during previous {}, last recorded offset: {}",

recordCounter, Strings.duration(millisSinceLastOutput), lastOffset);

});

}

}

finally {

recordCounter = 0;

previousOutputMillis += millisSinceLastOutput;

}

}

}

}

//......

}

  • BinlogReader继承了AbstractReader,其构造器创建了BinaryLogClient并设置了registerEventListener(handleEvent)及eventDeserializer;其doStart方法初始化eventHandlers,然后设置gtidSet或者binlogFilename及binlogPosition,然后执行client.connect(timeout);其doStop方法执行client.disconnect();其pollComplete方法主要更新recordCounter及totalRecordCounter等metrics

小结

BinlogReader继承了AbstractReader,其构造器创建了BinaryLogClient并设置了registerEventListener(handleEvent)及eventDeserializer;其doStart方法初始化eventHandlers,然后设置gtidSet或者binlogFilename及binlogPosition,然后执行client.connect(timeout);其doStop方法执行client.disconnect();其pollComplete方法主要更新recordCounter及totalRecordCounter等metrics

doc

  • BinlogReader

以上是 聊聊debezium的BinlogReader 的全部内容, 来源链接: utcz.com/z/516565.html

回到顶部