聊聊maxwell的PositionStoreThread

编程

PositionStoreThread

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java

public class PositionStoreThread extends RunLoopProcess implements Runnable {

static final Logger LOGGER = LoggerFactory.getLogger(PositionStoreThread.class);

private Position position; // in memory position

private Position storedPosition; // position as flushed to storage

private final MysqlPositionStore store;

private MaxwellContext context;

private Exception exception;

private Thread thread;

private BinlogPosition lastHeartbeatSentFrom; // last position we sent a heartbeat from

private long lastHeartbeatSent;

public PositionStoreThread(MysqlPositionStore store, MaxwellContext context) {

this.store = store;

this.context = context;

lastHeartbeatSentFrom = null;

lastHeartbeatSent = 0L;

}

public void start() {

this.thread = new Thread(this, "Position Flush Thread");

this.thread.setDaemon(true);

thread.start();

}

@Override

public void run() {

try {

runLoop();

} catch ( Exception e ) {

this.exception = e;

context.terminate(e);

} finally {

this.taskState.stopped();

}

}

@Override

protected void beforeStop() {

if ( exception == null ) {

try {

storeFinalPosition();

} catch ( Exception e ) {

LOGGER.error("error storing final position: " + e);

}

}

}

void storeFinalPosition() throws SQLException, DuplicateProcessException {

if ( position != null && !position.equals(storedPosition) ) {

LOGGER.info("Storing final position: " + position);

store.set(position);

}

}

public void heartbeat() throws Exception {

store.heartbeat();

}

boolean shouldHeartbeat(Position currentPosition) {

if ( currentPosition == null )

return true;

if ( lastHeartbeatSentFrom == null )

return true;

BinlogPosition currentBinlog = currentPosition.getBinlogPosition();

if ( !lastHeartbeatSentFrom.getFile().equals(currentBinlog.getFile()) )

return true;

if ( currentBinlog.getOffset() - lastHeartbeatSentFrom.getOffset() > 1000 ) {

return true;

}

long secondsSinceHeartbeat = (System.currentTimeMillis() - lastHeartbeatSent) / 1000;

if ( secondsSinceHeartbeat >= 10 ) {

// during quiet times, heartbeat at least every 10s

return true;

}

return false;

}

public void work() throws Exception {

Position newPosition = position;

if ( newPosition != null && newPosition.newerThan(storedPosition) ) {

store.set(newPosition);

storedPosition = newPosition;

}

try { Thread.sleep(1000); } catch (InterruptedException e) { }

if ( shouldHeartbeat(newPosition) ) {

lastHeartbeatSent = store.heartbeat();

if (newPosition != null) {

lastHeartbeatSentFrom = newPosition.getBinlogPosition();

}

}

}

public synchronized void setPosition(Position p) {

if ( position == null || p.newerThan(position) ) {

position = p;

if (storedPosition == null) {

storedPosition = p;

}

}

}

public synchronized Position getPosition() throws SQLException {

if ( position != null )

return position;

position = store.get();

return position;

}

}

  • PositionStoreThread继承了RunLoopProcess,实现了Runnable,其run方法执行runLoop;其beforeStop方法执行storeFinalPosition方法;该方法在判断position与storedPosition不等时更新position到store;其work方法会更新newPosition到store,然后判断是否需要hearbeat,需要的话执行store.heartbeat();shouldHeartbeat方法在currentBinlog.getOffset()与lastHeartbeatSentFrom.getOffset()差值大于1000时返回true,在secondsSinceHeartbeat大于等于10的时候返回true

MaxwellContext

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/MaxwellContext.java

public class MaxwellContext {

//......

public PositionStoreThread getPositionStoreThread() {

if ( this.positionStoreThread == null ) {

this.positionStoreThread = new PositionStoreThread(this.positionStore, this);

this.positionStoreThread.start();

addTask(positionStoreThread);

}

return this.positionStoreThread;

}

public void addTask(StoppableTask task) {

this.taskManager.add(task);

}

private void shutdown(AtomicBoolean complete) {

try {

taskManager.stop(this.error);

this.replicationConnectionPool.release();

this.maxwellConnectionPool.release();

this.rawMaxwellConnectionPool.release();

complete.set(true);

} catch (Exception e) {

LOGGER.error("Exception occurred during shutdown:", e);

}

}

//......

}

  • MaxwellContext提供了getPositionStoreThread方法,在positionStoreThread为null的时候会创建一个positionStoreThread并执行其start方法,同时执行addTask方法;addTask将StoppableTask添加到taskManager中;其shutdown方法会执行taskManager.stop

TaskManager

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/TaskManager.java

public class TaskManager {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskManager.class);

private final ArrayList<StoppableTask> tasks;

private volatile RunState state;

public TaskManager() {

this.tasks = new ArrayList<>();

this.state = RunState.RUNNING;

}

// Can be invoked multiple times, will only return `true`

// for the first invocation.

public synchronized boolean requestStop() {

if (state == RunState.RUNNING) {

state = RunState.REQUEST_STOP;

return true;

} else {

return false;

}

}

public synchronized void stop(Exception error) throws Exception {

if (this.state == RunState.STOPPED) {

LOGGER.debug("Stop() called multiple times");

return;

}

this.state = RunState.REQUEST_STOP;

LOGGER.info("Stopping " + tasks.size() + " tasks");

if (error != null) {

LOGGER.error("cause: ", error);

}

// tell everything to stop

for (StoppableTask task: this.tasks) {

LOGGER.info("Stopping: " + task);

task.requestStop();

}

// then wait for everything to stop

Long timeout = 1000L;

for (StoppableTask task: this.tasks) {

LOGGER.debug("Awaiting stop of: " + task);

task.awaitStop(timeout);

}

this.state = RunState.STOPPED;

LOGGER.info("Stopped all tasks");

}

public synchronized void add(StoppableTask task) {

synchronized (tasks) {

tasks.add(task);

}

}

}

  • TaskManager的add方法会将task添加到tasks中;其requestStop方法会设置state为RunState.REQUEST_STOP;其stop方法先设置state为RunState.REQUEST_STOP,然后遍历tasks挨个执行task.requestStop(),之后在执行task.awaitStop(timeout),最后更新state为RunState.STOPPED

小结

PositionStoreThread继承了RunLoopProcess,实现了Runnable,其run方法执行runLoop;其beforeStop方法执行storeFinalPosition方法;该方法在判断position与storedPosition不等时更新position到store;其work方法会更新newPosition到store,然后判断是否需要hearbeat,需要的话执行store.heartbeat();shouldHeartbeat方法在currentBinlog.getOffset()与lastHeartbeatSentFrom.getOffset()差值大于1000时返回true,在secondsSinceHeartbeat大于等于10的时候返回true

doc

  • PositionStoreThread

以上是 聊聊maxwell的PositionStoreThread 的全部内容, 来源链接: utcz.com/z/516159.html

回到顶部