聊聊maxwell的Recovery

编程

本文主要研究一下maxwell的Recovery

Recovery

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/recovery/Recovery.java

public class Recovery {

static final Logger LOGGER = LoggerFactory.getLogger(Recovery.class);

private final ConnectionPool replicationConnectionPool;

private final RecoveryInfo recoveryInfo;

private final MaxwellMysqlConfig replicationConfig;

private final String maxwellDatabaseName;

private final RecoverySchemaStore schemaStore;

public Recovery(MaxwellMysqlConfig replicationConfig,

String maxwellDatabaseName,

ConnectionPool replicationConnectionPool,

CaseSensitivity caseSensitivity,

RecoveryInfo recoveryInfo) {

this.replicationConfig = replicationConfig;

this.replicationConnectionPool = replicationConnectionPool;

this.recoveryInfo = recoveryInfo;

this.schemaStore = new RecoverySchemaStore(replicationConnectionPool, maxwellDatabaseName, caseSensitivity);

this.maxwellDatabaseName = maxwellDatabaseName;

}

public HeartbeatRowMap recover() throws Exception {

String recoveryMsg = String.format(

"old-server-id: %d, position: %s",

recoveryInfo.serverID,

recoveryInfo.position

);

LOGGER.warn("attempting to recover from master-change: " + recoveryMsg);

List<BinlogPosition> list = getBinlogInfo();

for ( int i = list.size() - 1; i >= 0 ; i-- ) {

BinlogPosition binlogPosition = list.get(i);

Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat());

Metrics metrics = new NoOpMetrics();

LOGGER.debug("scanning binlog: " + binlogPosition);

Replicator replicator = new BinlogConnectorReplicator(

this.schemaStore,

null,

null,

replicationConfig,

0L, // server-id of 0 activates "mysqlbinlog" behavior where the server will stop after each binlog

maxwellDatabaseName,

metrics,

position,

true,

recoveryInfo.clientID,

new HeartbeatNotifier(),

null,

new RecoveryFilter(this.maxwellDatabaseName),

new MaxwellOutputConfig(),

0.25f // Default memory usage size, not used

);

HeartbeatRowMap h = findHeartbeat(replicator);

if ( h != null ) {

LOGGER.warn("recovered new master position: " + h.getNextPosition());

return h;

}

}

LOGGER.error("Could not recover from master-change: " + recoveryMsg);

return null;

}

/**

* try to find a given heartbeat value from the replicator.

* @return A BinlogPosition where the heartbeat was found, or null if none was found.

*/

private HeartbeatRowMap findHeartbeat(Replicator r) throws Exception {

r.startReplicator();

for (RowMap row = r.getRow(); row != null ; row = r.getRow()) {

if (!(row instanceof HeartbeatRowMap)) {

continue;

}

HeartbeatRowMap heartbeatRow = (HeartbeatRowMap) row;

if (heartbeatRow.getPosition().getLastHeartbeatRead() == recoveryInfo.getHeartbeat())

return heartbeatRow;

}

return null;

}

/**

* fetch a list of binlog positions representing the start of each binlog file

*

* @return a list of binlog positions to attempt recovery at

* */

private List<BinlogPosition> getBinlogInfo() throws SQLException {

ArrayList<BinlogPosition> list = new ArrayList<>();

try ( Connection c = replicationConnectionPool.getConnection() ) {

ResultSet rs = c.createStatement().executeQuery("SHOW BINARY LOGS");

while ( rs.next() ) {

list.add(BinlogPosition.at(4, rs.getString("Log_name")));

}

}

return list;

}

}

  • Recovery提供了recover方法,它先通过getBinlogInfo方法获取BinlogPosition列表,之后从后往前遍历BinlogPosition构建BinlogConnectorReplicator,然后最后通过findHeartbeat方法查找heartbeatRow.getPosition().getLastHeartbeatRead()为recoveryInfo.getHeartbeat()的HeartbeatRowMap,如果不为null则直接返回

Maxwell

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/Maxwell.java

public class Maxwell implements Runnable {

protected MaxwellConfig config;

protected MaxwellContext context;

protected Replicator replicator;

static final Logger LOGGER = LoggerFactory.getLogger(Maxwell.class);

public Maxwell(MaxwellConfig config) throws SQLException, URISyntaxException {

this(new MaxwellContext(config));

}

protected Maxwell(MaxwellContext context) throws SQLException, URISyntaxException {

this.config = context.getConfig();

this.context = context;

this.context.probeConnections();

}

//......

private Position attemptMasterRecovery() throws Exception {

HeartbeatRowMap recoveredHeartbeat = null;

MysqlPositionStore positionStore = this.context.getPositionStore();

RecoveryInfo recoveryInfo = positionStore.getRecoveryInfo(config);

if ( recoveryInfo != null ) {

Recovery masterRecovery = new Recovery(

config.replicationMysql,

config.databaseName,

this.context.getReplicationConnectionPool(),

this.context.getCaseSensitivity(),

recoveryInfo

);

recoveredHeartbeat = masterRecovery.recover();

if (recoveredHeartbeat != null) {

// load up the schema from the recovery position and chain it into the

// new server_id

MysqlSchemaStore oldServerSchemaStore = new MysqlSchemaStore(

context.getMaxwellConnectionPool(),

context.getReplicationConnectionPool(),

context.getSchemaConnectionPool(),

recoveryInfo.serverID,

recoveryInfo.position,

context.getCaseSensitivity(),

config.filter,

false

);

// Note we associate this schema to the start position of the heartbeat event, so that

// we pick it up when resuming at the event after the heartbeat.

oldServerSchemaStore.clone(context.getServerID(), recoveredHeartbeat.getPosition());

return recoveredHeartbeat.getNextPosition();

}

}

return null;

}

//......

}

  • Maxwell的attemptMasterRecovery方法通过positionStore.getRecoveryInfo(config)获取recoveryInfo,若recoveryInfo不为null则创建masterRecovery,执行masterRecovery.recover()获取recoveredHeartbeat,若recoveredHeartbeat不为null则返回recoveredHeartbeat.getNextPosition()

小结

Recovery提供了recover方法,它先通过getBinlogInfo方法获取BinlogPosition列表,之后从后往前遍历BinlogPosition构建BinlogConnectorReplicator,然后最后通过findHeartbeat方法查找heartbeatRow.getPosition().getLastHeartbeatRead()为recoveryInfo.getHeartbeat()的HeartbeatRowMap,如果不为null则直接返回

doc

  • Recovery

以上是 聊聊maxwell的Recovery 的全部内容, 来源链接: utcz.com/z/516298.html

回到顶部