聊聊BinlogConnectorReplicator的work

编程

StoppableTask

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/StoppableTask.java

public interface StoppableTask {

void requestStop() throws Exception;

void awaitStop(Long timeout) throws TimeoutException;

}

  • StoppableTask接口定义了requestStop、awaitStop方法

RunLoopProcess

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/RunLoopProcess.java

abstract public class RunLoopProcess implements StoppableTask {

protected volatile StoppableTaskState taskState;

private Thread thread;

public RunLoopProcess() {

this.taskState = new StoppableTaskState(this.getClass().getName());

}

public void requestStop() {

this.taskState.requestStop();

interrupt();

}

public void interrupt() {

if ( this.thread != null )

this.thread.interrupt();

}

public void awaitStop(Long timeout) throws TimeoutException {

this.taskState.awaitStop(thread, timeout);

}

public void runLoop() throws Exception {

this.thread = Thread.currentThread();

this.beforeStart();

try {

while (this.taskState.isRunning()) {

work();

}

} finally {

this.beforeStop();

this.taskState.stopped();

}

}

protected abstract void work() throws Exception;

protected void beforeStart() throws Exception { }

protected void beforeStop() throws Exception { }

}

  • RunLoopProcess实现了StoppableTask接口,其runLoop方法会通过while循环不断执行work()方法,直到taskState.isRunning()为false

BinlogConnectorReplicator

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java

public class BinlogConnectorReplicator extends RunLoopProcess implements Replicator {

//......

public void work() throws Exception {

RowMap row = null;

try {

row = getRow();

} catch ( InterruptedException e ) {

}

if ( row == null )

return;

rowCounter.inc();

rowMeter.mark();

if ( scripting != null && !isMaxwellRow(row))

scripting.invoke(row);

processRow(row);

}

protected void processRow(RowMap row) throws Exception {

if ( row instanceof HeartbeatRowMap) {

producer.push(row);

if (stopAtHeartbeat != null) {

long thisHeartbeat = row.getPosition().getLastHeartbeatRead();

if (thisHeartbeat >= stopAtHeartbeat) {

LOGGER.info("received final heartbeat " + thisHeartbeat + "; stopping replicator");

// terminate runLoop

this.taskState.stopped();

}

}

} else if ( !shouldSkipRow(row) )

producer.push(row);

}

//......

}

  • BinlogConnectorReplicator实现了RunLoopProcess接口,其work方法通过getRow()获取RowMap,若不为null则执行processRow(row)方法;processRow方法则执行producer.push(row)

AbstractProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/AbstractProducer.java

public abstract class AbstractProducer {

protected final MaxwellContext context;

protected final MaxwellOutputConfig outputConfig;

protected final Counter succeededMessageCount;

protected final Meter succeededMessageMeter;

protected final Counter failedMessageCount;

protected final Meter failedMessageMeter;

protected final Timer messagePublishTimer;

protected final Timer messageLatencyTimer;

protected final Counter messageLatencySloViolationCount;

public AbstractProducer(MaxwellContext context) {

this.context = context;

this.outputConfig = context.getConfig().outputConfig;

Metrics metrics = context.getMetrics();

MetricRegistry metricRegistry = metrics.getRegistry();

this.succeededMessageCount = metricRegistry.counter(metrics.metricName("messages", "succeeded"));

this.succeededMessageMeter = metricRegistry.meter(metrics.metricName("messages", "succeeded", "meter"));

this.failedMessageCount = metricRegistry.counter(metrics.metricName("messages", "failed"));

this.failedMessageMeter = metricRegistry.meter(metrics.metricName("messages", "failed", "meter"));

this.messagePublishTimer = metricRegistry.timer(metrics.metricName("message", "publish", "time"));

this.messageLatencyTimer = metricRegistry.timer(metrics.metricName("message", "publish", "age"));

this.messageLatencySloViolationCount = metricRegistry.counter(metrics.metricName("message", "publish", "age", "slo_violation"));

}

abstract public void push(RowMap r) throws Exception;

public StoppableTask getStoppableTask() {

return null;

}

public Meter getFailedMessageMeter() {

return this.failedMessageMeter;

}

public MaxwellDiagnostic getDiagnostic() {

return null;

}

}

  • AbstractProducer定义了push抽象方法供子类实现

StdoutProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/StdoutProducer.java

public class StdoutProducer extends AbstractProducer {

public StdoutProducer(MaxwellContext context) {

super(context);

}

@Override

public void push(RowMap r) throws Exception {

String output = r.toJSON(outputConfig);

if ( output != null && r.shouldOutput(outputConfig) )

System.out.println(output);

this.context.setPosition(r);

}

}

  • StdoutProducer继承了AbstractProducer,其push方法则执行System.out.println(output)以及context.setPosition(r)

小结

StoppableTask接口定义了requestStop、awaitStop方法;RunLoopProcess实现了StoppableTask接口,其runLoop方法会通过while循环不断执行work()方法,直到taskState.isRunning()为false;BinlogConnectorReplicator实现了RunLoopProcess接口,其work方法通过getRow()获取RowMap,若不为null则执行processRow(row)方法;processRow方法则执行producer.push(row)

doc

  • BinlogConnectorReplicator

以上是 聊聊BinlogConnectorReplicator的work 的全部内容, 来源链接: utcz.com/z/516082.html

回到顶部