聊聊rocketmqmysql的Replicator

编程

本文主要研究一下rocketmq-mysql的Replicator

Replicator

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java

public class Replicator {

private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);

private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");

private Config config;

private EventProcessor eventProcessor;

private RocketMQProducer rocketMQProducer;

private Object lock = new Object();

private BinlogPosition nextBinlogPosition;

private long nextQueueOffset;

private long xid;

public static void main(String[] args) {

Replicator replicator = new Replicator();

replicator.start();

}

public void start() {

try {

config = new Config();

config.load();

rocketMQProducer = new RocketMQProducer(config);

rocketMQProducer.start();

BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);

binlogPositionLogThread.start();

eventProcessor = new EventProcessor(this);

eventProcessor.start();

} catch (Exception e) {

LOGGER.error("Start error.", e);

System.exit(1);

}

}

public void commit(Transaction transaction, boolean isComplete) {

String json = transaction.toJson();

for (int i = 0; i < 3; i++) {

try {

if (isComplete) {

long offset = rocketMQProducer.push(json);

synchronized (lock) {

xid = transaction.getXid();

nextBinlogPosition = transaction.getNextBinlogPosition();

nextQueueOffset = offset;

}

} else {

rocketMQProducer.push(json);

}

break;

} catch (Exception e) {

LOGGER.error("Push error,retry:" + (i + 1) + ",", e);

}

}

}

public void logPosition() {

String binlogFilename = null;

long xid = 0L;

long nextPosition = 0L;

long nextOffset = 0L;

synchronized (lock) {

if (nextBinlogPosition != null) {

xid = this.xid;

binlogFilename = nextBinlogPosition.getBinlogFilename();

nextPosition = nextBinlogPosition.getPosition();

nextOffset = nextQueueOffset;

}

}

if (binlogFilename != null) {

POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}",

xid, binlogFilename, nextPosition, nextOffset);

}

}

public Config getConfig() {

return config;

}

public BinlogPosition getNextBinlogPosition() {

return nextBinlogPosition;

}

}

  • Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

RocketMQProducer

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java

public class RocketMQProducer {

private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);

private DefaultMQProducer producer;

private Config config;

public RocketMQProducer(Config config) {

this.config = config;

}

public void start() throws MQClientException {

producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");

producer.setNamesrvAddr(config.mqNamesrvAddr);

producer.start();

}

public long push(String json) throws Exception {

LOGGER.debug(json);

Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));

SendResult sendResult = producer.send(message);

return sendResult.getQueueOffset();

}

}

  • RocketMQProducer的start方法创建DefaultMQProducer并执行其start方法;其push方法则通过producer.send(message)发送消息,并返回sendResult.getQueueOffset()

BinlogPositionLogThread

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java

public class BinlogPositionLogThread extends Thread {

private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);

private Replicator replicator;

public BinlogPositionLogThread(Replicator replicator) {

this.replicator = replicator;

setDaemon(true);

}

@Override

public void run() {

while (true) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

logger.error("Offset thread interrupted.", e);

}

replicator.logPosition();

}

}

}

  • BinlogPositionLogThread会定时执行replicator.logPosition()来打印position信息

小结

Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

doc

  • Replicator

以上是 聊聊rocketmqmysql的Replicator 的全部内容, 来源链接: utcz.com/z/516788.html

回到顶部