聊聊maxwell的BootstrapController

编程

BootstrapController

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/bootstrap/BootstrapController.java

public class BootstrapController extends RunLoopProcess  {

static final Logger LOGGER = LoggerFactory.getLogger(BootstrapController.class);

private final long MAX_TX_ELEMENTS = 10000;

private final ConnectionPool maxwellConnectionPool;

private final SynchronousBootstrapper bootstrapper;

private final AbstractProducer producer;

private final String clientID;

private final boolean syncMode;

private Long currentSchemaID;

public BootstrapController(

ConnectionPool maxwellConnectionPool,

AbstractProducer producer,

SynchronousBootstrapper bootstrapper,

String clientID,

boolean syncMode,

Long currentSchemaID

) {

this.maxwellConnectionPool = maxwellConnectionPool;

this.producer = producer;

this.bootstrapper = bootstrapper;

this.clientID = clientID;

this.syncMode = syncMode;

this.currentSchemaID = currentSchemaID;

}

// this mutex is used to block rows from being produced while a "synchronous"

// bootstrap is run

private Object bootstrapMutex = new Object();

// this one is used to protect against races in an async producer.

private Object completionMutex = new Object();

private BootstrapTask activeTask;

private RowMapBuffer skippedRows = new RowMapBuffer(MAX_TX_ELEMENTS);

@Override

protected void work() throws Exception {

try {

doWork();

} catch ( InterruptedException e ) {

} catch ( SQLException e ) {

LOGGER.error("got SQLException trying to bootstrap", e);

}

}

private void doWork() throws Exception {

List<BootstrapTask> tasks = getIncompleteTasks();

synchronized(bootstrapMutex) {

for ( BootstrapTask task : tasks ) {

LOGGER.debug("starting bootstrap task: {}", task.logString());

synchronized(completionMutex) {

activeTask = task;

}

bootstrapper.startBootstrap(task, producer, getCurrentSchemaID());

synchronized(completionMutex) {

pushSkippedRows();

activeTask = null;

}

}

}

Thread.sleep(1000);

}

private synchronized Long getCurrentSchemaID() {

return this.currentSchemaID;

}

public synchronized void setCurrentSchemaID(long schemaID) {

this.currentSchemaID = schemaID;

}

private List<BootstrapTask> getIncompleteTasks() throws SQLException {

ArrayList<BootstrapTask> list = new ArrayList<>();

try ( Connection cx = maxwellConnectionPool.getConnection() ) {

PreparedStatement s = cx.prepareStatement("select * from bootstrap where is_complete = 0 and client_id = ? order by id");

s.setString(1, this.clientID);

ResultSet rs = s.executeQuery();

while (rs.next()) {

list.add(BootstrapTask.valueOf(rs));

}

}

return list;

}

public boolean shouldSkip(RowMap row) throws IOException {

// The main replication thread skips rows of the currently bootstrapped

// table and the tables that are queued for bootstrap. The bootstrap thread replays them at

// the end of the bootstrap.

if ( syncMode )

synchronized(bootstrapMutex) { return false; }

else {

synchronized (completionMutex) {

if (activeTask == null)

return false;

// async mode with an active task

if (activeTask.matches(row)) {

skippedRows.add(row);

return true;

} else

return false;

}

}

}

private void pushSkippedRows() throws Exception {

skippedRows.flushToDisk();

while ( skippedRows.size() > 0 ) {

RowMap row = skippedRows.removeFirst();

producer.push(row);

}

}

}

  • BootstrapController继承了RunLoopProcess,其work方法执行doWork方法;其doWork方法通过getIncompleteTasks获取tasks,然后遍历task挨个执行bootstrapper.startBootstrap(task, producer, getCurrentSchemaID())及pushSkippedRows方法;getIncompleteTasks从数据库中查询bootstrap表中is_complete为0的指定client_id的记录;pushSkippedRows方法先执行skippedRows.flushToDisk(),然后遍历移除并执行producer.push(row)

SynchronousBootstrapper

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/bootstrap/SynchronousBootstrapper.java

public class SynchronousBootstrapper {

static final Logger LOGGER = LoggerFactory.getLogger(SynchronousBootstrapper.class);

private static final long INSERTED_ROWS_UPDATE_PERIOD_MILLIS = 250;

private final MaxwellContext context;

private long lastInsertedRowsUpdateTimeMillis = 0;

public SynchronousBootstrapper(MaxwellContext context) {

this.context = context;

}

public void startBootstrap(BootstrapTask task, AbstractProducer producer, Long currentSchemaID) throws Exception {

performBootstrap(task, producer, currentSchemaID);

completeBootstrap(task, producer);

}

private Schema captureSchemaForBootstrap(BootstrapTask task) throws SQLException {

try ( Connection cx = getConnection(task.database) ) {

CaseSensitivity s = MaxwellMysqlStatus.captureCaseSensitivity(cx);

SchemaCapturer c = new SchemaCapturer(cx, s, task.database, task.table);

return c.capture();

}

}

public void performBootstrap(BootstrapTask task, AbstractProducer producer, Long currentSchemaID) throws Exception {

LOGGER.debug("bootstrapping requested for " + task.logString());

Schema schema = captureSchemaForBootstrap(task);

Database database = findDatabase(schema, task.database);

Table table = findTable(task.table, database);

producer.push(bootstrapStartRowMap(task, table));

LOGGER.info(String.format("bootstrapping started for %s.%s", task.database, task.table));

try ( Connection streamingConnection = getStreamingConnection(task.database)) {

setBootstrapRowToStarted(task.id);

ResultSet resultSet = getAllRows(task.database, task.table, table, task.whereClause, streamingConnection);

int insertedRows = 0;

lastInsertedRowsUpdateTimeMillis = 0; // ensure updateInsertedRowsColumn is called at least once

while ( resultSet.next() ) {

RowMap row = bootstrapEventRowMap("bootstrap-insert", table.database, table.name, table.getPKList(), task.comment);

setRowValues(row, resultSet, table);

row.setSchemaId(currentSchemaID);

Scripting scripting = context.getConfig().scripting;

if ( scripting != null )

scripting.invoke(row);

if ( LOGGER.isDebugEnabled() )

LOGGER.debug("bootstrapping row : " + row.toJSON());

producer.push(row);

Thread.sleep(1);

++insertedRows;

updateInsertedRowsColumn(insertedRows, task.id);

}

setBootstrapRowToCompleted(insertedRows, task.id);

} catch ( NoSuchElementException e ) {

LOGGER.info("bootstrapping aborted for " + task.logString());

}

}

public void completeBootstrap(BootstrapTask task, AbstractProducer producer) throws Exception {

producer.push(bootstrapEventRowMap("bootstrap-complete", task.database, task.table, new ArrayList<>(), task.comment));

LOGGER.info("bootstrapping ended for " + task.logString());

}

//......

}

  • SynchronousBootstrapper的startBootstrap方法执行performBootstrap及completeBootstrap方法;其中performBootstrap方法执行producer.push(bootstrapStartRowMap(task, table)),之后根据task指定的数据库、表及条件查询记录,然后遍历结果集producer.push(row);completeBootstrap方法执行producer.push(bootstrapEventRowMap("bootstrap-complete", task.database, task.table, new ArrayList<>(), task.comment))

小结

BootstrapController继承了RunLoopProcess,其work方法执行doWork方法;其doWork方法通过getIncompleteTasks获取tasks,然后遍历task挨个执行bootstrapper.startBootstrap(task, producer, getCurrentSchemaID())及pushSkippedRows方法;getIncompleteTasks从数据库中查询bootstrap表中is_complete为0的指定client_id的记录;pushSkippedRows方法先执行skippedRows.flushToDisk(),然后遍历移除并执行producer.push(row)

doc

  • BootstrapController

以上是 聊聊maxwell的BootstrapController 的全部内容, 来源链接: utcz.com/z/516233.html

回到顶部