聊聊rocketmqmysql的EventProcessor

编程

EventProcessor

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java

public class EventProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);

private Replicator replicator;

private Config config;

private DataSource dataSource;

private BinlogPositionManager binlogPositionManager;

private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100);

private BinaryLogClient binaryLogClient;

private EventListener eventListener;

private Schema schema;

private Map<Long, Table> tableMap = new HashMap<>();

private Transaction transaction;

public EventProcessor(Replicator replicator) {

this.replicator = replicator;

this.config = replicator.getConfig();

}

public void start() throws Exception {

initDataSource();

binlogPositionManager = new BinlogPositionManager(config, dataSource);

binlogPositionManager.initBeginPosition();

schema = new Schema(dataSource);

schema.load();

eventListener = new EventListener(queue);

binaryLogClient = new BinaryLogClient(config.mysqlAddr,

config.mysqlPort,

config.mysqlUsername,

config.mysqlPassword);

binaryLogClient.setBlocking(true);

binaryLogClient.setServerId(1001);

EventDeserializer eventDeserializer = new EventDeserializer();

eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,

EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);

binaryLogClient.setEventDeserializer(eventDeserializer);

binaryLogClient.registerEventListener(eventListener);

binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename());

binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition());

binaryLogClient.connect(3000);

LOGGER.info("Started.");

doProcess();

}

//......

}

  • EventProcessor提供了start方法,该方法首先执行initDataSource;之后创建BinlogPositionManager并执行binlogPositionManager.initBeginPosition();然后创建EventListener及BinaryLogClient并执行binaryLogClient.connect(3000);最后执行doProcess方法

initDataSource

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java

public class EventProcessor {

//......

private void initDataSource() throws Exception {

Map<String, String> map = new HashMap<>();

map.put("driverClassName", "com.mysql.jdbc.Driver");

map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + config.mysqlPort + "?useSSL=true&verifyServerCertificate=false");

map.put("username", config.mysqlUsername);

map.put("password", config.mysqlPassword);

map.put("initialSize", "2");

map.put("maxActive", "2");

map.put("maxWait", "60000");

map.put("timeBetweenEvictionRunsMillis", "60000");

map.put("minEvictableIdleTimeMillis", "300000");

map.put("validationQuery", "SELECT 1 FROM DUAL");

map.put("testWhileIdle", "true");

dataSource = DruidDataSourceFactory.createDataSource(map);

}

//......

}

  • initDataSource主要是通过DruidDataSourceFactory来创建dataSource

doProcess

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java

public class EventProcessor {

//......

private void doProcess() {

while (true) {

try {

Event event = queue.poll(1000, TimeUnit.MILLISECONDS);

if (event == null) {

checkConnection();

continue;

}

switch (event.getHeader().getEventType()) {

case TABLE_MAP:

processTableMapEvent(event);

break;

case WRITE_ROWS:

case EXT_WRITE_ROWS:

processWriteEvent(event);

break;

case UPDATE_ROWS:

case EXT_UPDATE_ROWS:

processUpdateEvent(event);

break;

case DELETE_ROWS:

case EXT_DELETE_ROWS:

processDeleteEvent(event);

break;

case QUERY:

processQueryEvent(event);

break;

case XID:

processXidEvent(event);

break;

}

} catch (Exception e) {

LOGGER.error("Binlog process error.", e);

}

}

}

//......

}

  • doProcess方法会执行queue.poll(1000, TimeUnit.MILLISECONDS)拉取event,如果event为null,则会执行checkConnection;之后根据event.getHeader().getEventType()来做不同处理;主要有processTableMapEvent、processWriteEvent、processUpdateEvent、processDeleteEvent、processQueryEvent、processXidEvent这几种

processEvent

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java

public class EventProcessor {

//......

private void processTableMapEvent(Event event) {

TableMapEventData data = event.getData();

String dbName = data.getDatabase();

String tableName = data.getTable();

Long tableId = data.getTableId();

Table table = schema.getTable(dbName, tableName);

tableMap.put(tableId, table);

}

private void processWriteEvent(Event event) {

WriteRowsEventData data = event.getData();

Long tableId = data.getTableId();

List<Serializable[]> list = data.getRows();

for (Serializable[] row : list) {

addRow("WRITE", tableId, row);

}

}

private void processUpdateEvent(Event event) {

UpdateRowsEventData data = event.getData();

Long tableId = data.getTableId();

List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows();

for (Map.Entry<Serializable[], Serializable[]> entry : list) {

addRow("UPDATE", tableId, entry.getValue());

}

}

private void processDeleteEvent(Event event) {

DeleteRowsEventData data = event.getData();

Long tableId = data.getTableId();

List<Serializable[]> list = data.getRows();

for (Serializable[] row : list) {

addRow("DELETE", tableId, row);

}

}

private void processQueryEvent(Event event) {

QueryEventData data = event.getData();

String sql = data.getSql();

if (createTablePattern.matcher(sql).find()) {

schema.reset();

}

}

private void processXidEvent(Event event) {

EventHeaderV4 header = event.getHeader();

XidEventData data = event.getData();

String binlogFilename = binaryLogClient.getBinlogFilename();

Long position = header.getNextPosition();

Long xid = data.getXid();

BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position);

transaction.setNextBinlogPosition(binlogPosition);

transaction.setXid(xid);

replicator.commit(transaction, true);

transaction = new Transaction(config);

}

private void addRow(String type, Long tableId, Serializable[] row) {

if (transaction == null) {

transaction = new Transaction(config);

}

Table t = tableMap.get(tableId);

if (t != null) {

while (true) {

if (transaction.addRow(type, t, row)) {

break;

} else {

transaction.setNextBinlogPosition(replicator.getNextBinlogPosition());

replicator.commit(transaction, false);

transaction = new Transaction(config);

}

}

}

}

//......

}

  • processWriteEvent、processUpdateEvent、processDeleteEvent都会执行addRow方法,它会执行transaction.addRow(type, t, row),如果返回false则会执行transaction.setNextBinlogPosition以及replicator.commit;processXidEvent会执行binaryLogClient.getBinlogFilename(),更新transaction的xid及binlogPosition,然后执行replicator.commit(transaction, true),并重置transaction

小结

EventProcessor提供了start方法,该方法首先执行initDataSource;之后创建BinlogPositionManager并执行binlogPositionManager.initBeginPosition();然后创建EventListener及BinaryLogClient并执行binaryLogClient.connect(3000);最后执行doProcess方法

doc

  • EventProcessor

以上是 聊聊rocketmqmysql的EventProcessor 的全部内容, 来源链接: utcz.com/z/516732.html

回到顶部