聊聊BinlogConnectorReplicator的getTransactionRows

编程

BinlogConnectorReplicator

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java

public class BinlogConnectorReplicator extends RunLoopProcess implements Replicator {

//......

private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws Exception {

BinlogConnectorEvent event;

RowMapBuffer buffer = new RowMapBuffer(MAX_TX_ELEMENTS, this.bufferMemoryUsage);

String currentQuery = null;

while ( true ) {

event = pollEvent();

if (event == null) {

ensureReplicatorThread();

continue;

}

EventType eventType = event.getEvent().getHeader().getEventType();

if (event.isCommitEvent()) {

if (!buffer.isEmpty()) {

buffer.getLast().setTXCommit();

long timeSpent = buffer.getLast().getTimestampMillis() - beginEvent.getEvent().getHeader().getTimestamp();

transactionExecutionTime.update(timeSpent);

transactionRowCount.update(buffer.size());

}

if(eventType == EventType.XID) {

buffer.setXid(event.xidData().getXid());

}

return buffer;

}

switch(eventType) {

case WRITE_ROWS:

case UPDATE_ROWS:

case DELETE_ROWS:

case EXT_WRITE_ROWS:

case EXT_UPDATE_ROWS:

case EXT_DELETE_ROWS:

Table table = tableCache.getTable(event.getTableID());

if ( table != null && shouldOutputEvent(table.getDatabase(), table.getName(), filter, table.getColumnNames()) ) {

for ( RowMap r : event.jsonMaps(table, getLastHeartbeatRead(), currentQuery) )

if (shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter)) {

buffer.add(r);

}

}

currentQuery = null;

break;

case TABLE_MAP:

TableMapEventData data = event.tableMapData();

tableCache.processEvent(getSchema(), this.filter, data.getTableId(), data.getDatabase(), data.getTable());

break;

case ROWS_QUERY:

RowsQueryEventData rqed = event.getEvent().getData();

currentQuery = rqed.getQuery();

break;

case QUERY:

QueryEventData qe = event.queryData();

String sql = qe.getSql();

String upperCaseSql = sql.toUpperCase();

if ( upperCaseSql.startsWith(BinlogConnectorEvent.SAVEPOINT)) {

LOGGER.debug("Ignoring SAVEPOINT in transaction: " + qe);

} else if ( createTablePattern.matcher(sql).find() ) {

// CREATE TABLE `foo` SELECT * FROM `bar` will put a CREATE TABLE

// inside a transaction. Note that this could, in rare cases, lead

// to us starting on a WRITE_ROWS event -- we sync the schema position somewhere

// kinda unsafe.

processQueryEvent(event);

} else if (upperCaseSql.startsWith("INSERT INTO MYSQL.RDS_") || upperCaseSql.startsWith("DELETE FROM MYSQL.RDS_")) {

// RDS heartbeat events take the following form:

// INSERT INTO mysql.rds_heartbeat2(id, value) values (1,1483041015005) ON DUPLICATE KEY UPDATE value = 1483041015005

// Other RDS internal events like below:

// INSERT INTO mysql.rds_sysinfo(name, value) values ("innodb_txn_key","Thu Nov 15 10:30:07 UTC 2018")

// DELETE FROM mysql.rds_sysinfo where name = "innodb_txn_key"

// We don"t need to process them, just ignore

} else if (upperCaseSql.startsWith("DROP TEMPORARY TABLE")) {

// Ignore temporary table drop statements inside transactions

} else {

LOGGER.warn("Unhandled QueryEvent @ {} inside transaction: {}", event.getPosition().fullPosition(), qe);

}

break;

}

}

}

//......

}

  • BinlogConnectorReplicator的getTransactionRows方法首先创建RowMapBuffer,然后while循环执行pollEvent,在event.isCommitEvent()时返回RowMapBuffer;不是的话则根据eventType做不同处理,对于shouldOutputEvent及shouldOutputRowMap则会将该RowMap放入RowMapBuffer中;对于TABLE_MAP的通过tableCache.processEvent处理;对于QUERY的通过processQueryEvent处理

ListWithDiskBuffer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/ListWithDiskBuffer.java

public class ListWithDiskBuffer<T> {

static final Logger LOGGER = LoggerFactory.getLogger(ListWithDiskBuffer.class);

private final long maxInMemoryElements;

private final LinkedList<T> list;

private long elementsInFile = 0;

private File file;

private ObjectInputStream is;

private ObjectOutputStream os;

public ListWithDiskBuffer(long maxInMemoryElements) {

this.maxInMemoryElements = maxInMemoryElements;

list = new LinkedList<>();

}

public void add(T element) throws IOException {

list.add(element);

while ( shouldBuffer() )

evict();

}

protected boolean shouldBuffer() {

return this.list.size() > maxInMemoryElements;

}

protected void resetOutputStreamCaches() throws IOException {

os.reset();

}

public void flushToDisk() throws IOException {

if ( os != null )

os.flush();

}

public boolean isEmpty() {

return this.size() == 0;

}

public T getLast() {

return list.getLast();

}

public T removeFirst(Class<T> clazz) throws IOException, ClassNotFoundException {

if ( elementsInFile > 0 ) {

if ( is == null ) {

os.flush();

is = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));

}

Object object = is.readObject();

T element = clazz.cast(object);

elementsInFile--;

return element;

} else {

return list.removeFirst();

}

}

public Long size() {

return list.size() + elementsInFile;

}

public Long inMemorySize() {

return Long.valueOf(list.size());

}

@Override

protected void finalize() throws Throwable {

try {

if ( file != null )

file.delete();

} finally {

super.finalize();

}

}

protected T evict() throws IOException {

if ( file == null ) {

file = File.createTempFile("maxwell", "events");

file.deleteOnExit();

os = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));

}

if ( elementsInFile == 0 ) {

LOGGER.info("Overflowed in-memory buffer, spilling over into " + file);

}

T evicted = this.list.removeFirst();

os.writeObject(evicted);

elementsInFile++;

if ( elementsInFile % maxInMemoryElements == 0 )

resetOutputStreamCaches();

return evicted;

}

}

  • ListWithDiskBuffer在add的时候会判断list大小是否大于maxInMemoryElements,大于则执行evict方法;evict方法会执行list.removeFirst(),然后通过os.writeObject(evicted)将其写入到文件,递增elementsInFile,在elementsInFile对maxInMemoryElements取余为0时执行resetOutputStreamCaches;其removeFirst方法在elementsInFile大于0时从文件中读取element,否则直接通过list.removeFirst()返回

RowMapBuffer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/row/RowMapBuffer.java

public class RowMapBuffer extends ListWithDiskBuffer<RowMap> {

private static long FlushOutputStreamBytes = 10000000;

private Long xid;

private Long xoffset = 0L;

private Long serverId;

private Long threadId;

private Long schemaId;

private long memorySize = 0;

private long outputStreamCacheSize = 0;

private final long maxMemory;

public RowMapBuffer(long maxInMemoryElements) {

super(maxInMemoryElements);

this.maxMemory = (long) (Runtime.getRuntime().maxMemory() * 0.25);

}

public RowMapBuffer(long maxInMemoryElements, long maxMemory) {

super(maxInMemoryElements);

this.maxMemory = maxMemory;

}

public RowMapBuffer(long maxInMemoryElements, float bufferMemoryUsage) {

super(maxInMemoryElements);

this.maxMemory = (long) (Runtime.getRuntime().maxMemory() * bufferMemoryUsage);

}

@Override

public void add(RowMap rowMap) throws IOException {

this.memorySize += rowMap.getApproximateSize();

super.add(rowMap);

}

@Override

protected boolean shouldBuffer() {

return memorySize > maxMemory;

}

@Override

protected RowMap evict() throws IOException {

RowMap r = super.evict();

this.memorySize -= r.getApproximateSize();

/* For performance reasons, the output stream will hold on to cached objects.

* There"s probably a smarter thing to do (write our own serdes, maybe?), but

* for now we forcibly flush its cache when it gets too big. */

this.outputStreamCacheSize += r.getApproximateSize();

if ( this.outputStreamCacheSize > FlushOutputStreamBytes ) {

resetOutputStreamCaches();

this.outputStreamCacheSize = 0;

}

return r;

}

public RowMap removeFirst() throws IOException, ClassNotFoundException {

RowMap r = super.removeFirst(RowMap.class);

r.setXid(this.xid);

r.setXoffset(this.xoffset++);

r.setServerId(this.serverId);

r.setThreadId(this.threadId);

r.setSchemaId(this.schemaId);

return r;

}

public void setXid(Long xid) {

this.xid = xid;

}

public void setServerId(Long serverId) {

this.serverId = serverId;

}

public void setThreadId(Long threadId) {

this.threadId = threadId;

}

public void setSchemaId(Long schemaId) {

this.schemaId = schemaId;

}

}

  • RowMapBuffer继承了ListWithDiskBuffer,其add及evict方法都会维护memorySize;shouldBuffer方法则判断memorySize是否大于maxMemory

TableCache

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/TableCache.java

public class TableCache {

private final String maxwellDB;

public TableCache(String maxwellDB) {

this.maxwellDB = maxwellDB;

}

private final HashMap<Long, Table> tableMapCache = new HashMap<>();

public void processEvent(Schema schema, Filter filter, Long tableId, String dbName, String tblName) {

if ( !tableMapCache.containsKey(tableId)) {

if ( filter.isTableBlacklisted(dbName, tblName) ) {

return;

}

Database db = schema.findDatabase(dbName);

if ( db == null )

throw new RuntimeException("Couldn"t find database " + dbName);

else {

Table tbl = db.findTable(tblName);

if (tbl == null)

throw new RuntimeException("Couldn"t find table " + tblName + " in database " + dbName);

else

tableMapCache.put(tableId, tbl);

}

}

}

public Table getTable(Long tableId) {

return tableMapCache.get(tableId);

}

public void clear() {

tableMapCache.clear();

}

}

  • TableCache维护了tableId及Table的tableMapCache;其processEvent方法在tableMapCache不包含tableId的时候,通过Schema找Database,再通过Database找Table,然后放入到tableMapCache中

小结

BinlogConnectorReplicator的getTransactionRows方法首先创建RowMapBuffer,然后while循环执行pollEvent,在event.isCommitEvent()时返回RowMapBuffer;不是的话则根据eventType做不同处理,对于shouldOutputEvent及shouldOutputRowMap则会将该RowMap放入RowMapBuffer中;对于TABLE_MAP的通过tableCache.processEvent处理;对于QUERY的通过processQueryEvent处理

doc

  • BinlogConnectorReplicator

以上是 聊聊BinlogConnectorReplicator的getTransactionRows 的全部内容, 来源链接: utcz.com/z/516059.html

回到顶部