聊聊canal的DirectLogFetcher

编程

本文主要研究一下canal的DirectLogFetcher

DirectLogFetcher

canal-1.1.4/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcher.java

public final class DirectLogFetcher extends LogFetcher {

protected static final Log logger = LogFactory.getLog(DirectLogFetcher.class);

/** Command to dump binlog */

public static final byte COM_BINLOG_DUMP = 18;

/** Packet header sizes */

public static final int NET_HEADER_SIZE = 4;

public static final int SQLSTATE_LENGTH = 5;

/** Packet offsets */

public static final int PACKET_LEN_OFFSET = 0;

public static final int PACKET_SEQ_OFFSET = 3;

/** Maximum packet length */

public static final int MAX_PACKET_LENGTH = (256 * 256 * 256 - 1);

/** BINLOG_DUMP options */

public static final int BINLOG_DUMP_NON_BLOCK = 1;

public static final int BINLOG_SEND_ANNOTATE_ROWS_EVENT = 2;

private Connection conn;

private OutputStream mysqlOutput;

private InputStream mysqlInput;

//......

/**

* Connect MySQL master to fetch binlog.

*/

public void open(Connection conn, String fileName, final long filePosition, final int serverId) throws IOException {

open(conn, fileName, filePosition, serverId, false);

}

/**

* Connect MySQL master to fetch binlog.

*/

public void open(Connection conn, String fileName, long filePosition, final int serverId, boolean nonBlocking)

throws IOException {

try {

this.conn = conn;

Class<?> connClazz = Class.forName("com.mysql.jdbc.ConnectionImpl");

Object unwrapConn = unwrapConnection(conn, connClazz);

if (unwrapConn == null) {

throw new IOException("Unable to unwrap " + conn.getClass().getName()

+ " to com.mysql.jdbc.ConnectionImpl");

}

// Get underlying IO streams for network communications.

Object connIo = getDeclaredField(unwrapConn, connClazz, "io");

if (connIo == null) {

throw new IOException("Get null field:" + conn.getClass().getName() + "#io");

}

mysqlOutput = (OutputStream) getDeclaredField(connIo, connIo.getClass(), "mysqlOutput");

mysqlInput = (InputStream) getDeclaredField(connIo, connIo.getClass(), "mysqlInput");

if (filePosition == 0) filePosition = BIN_LOG_HEADER_SIZE;

sendBinlogDump(fileName, filePosition, serverId, nonBlocking);

position = 0;

} catch (IOException e) {

close(); /* Do cleanup */

logger.error("Error on COM_BINLOG_DUMP: file = " + fileName + ", position = " + filePosition);

throw e;

} catch (ClassNotFoundException e) {

close(); /* Do cleanup */

throw new IOException("Unable to load com.mysql.jdbc.ConnectionImpl", e);

}

}

public boolean fetch() throws IOException {

try {

// Fetching packet header from input.

if (!fetch0(0, NET_HEADER_SIZE)) {

logger.warn("Reached end of input stream while fetching header");

return false;

}

// Fetching the first packet(may a multi-packet).

int netlen = getUint24(PACKET_LEN_OFFSET);

int netnum = getUint8(PACKET_SEQ_OFFSET);

if (!fetch0(NET_HEADER_SIZE, netlen)) {

logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);

return false;

}

// Detecting error code.

final int mark = getUint8(NET_HEADER_SIZE);

if (mark != 0) {

if (mark == 255) // error from master

{

// Indicates an error, for example trying to fetch from

// wrong

// binlog position.

position = NET_HEADER_SIZE + 1;

final int errno = getInt16();

String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);

String errmsg = getFixString(limit - position);

throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate

+ " errmsg = " + errmsg);

} else if (mark == 254) {

// Indicates end of stream. It"s not clear when this would

// be sent.

logger.warn("Received EOF packet from server, apparent" + " master disconnected.");

return false;

} else {

// Should not happen.

throw new IOException("Unexpected response " + mark + " while fetching binlog: packet #" + netnum

+ ", len = " + netlen);

}

}

// The first packet is a multi-packet, concatenate the packets.

while (netlen == MAX_PACKET_LENGTH) {

if (!fetch0(0, NET_HEADER_SIZE)) {

logger.warn("Reached end of input stream while fetching header");

return false;

}

netlen = getUint24(PACKET_LEN_OFFSET);

netnum = getUint8(PACKET_SEQ_OFFSET);

if (!fetch0(limit, netlen)) {

logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);

return false;

}

}

// Preparing buffer variables to decoding.

origin = NET_HEADER_SIZE + 1;

position = origin;

limit -= origin;

return true;

} catch (SocketTimeoutException e) {

close(); /* Do cleanup */

logger.error("Socket timeout expired, closing connection", e);

throw e;

} catch (InterruptedIOException e) {

close(); /* Do cleanup */

logger.warn("I/O interrupted while reading from client socket", e);

throw e;

} catch (IOException e) {

close(); /* Do cleanup */

logger.error("I/O error while reading from client socket", e);

throw e;

}

}

private final boolean fetch0(final int off, final int len) throws IOException {

ensureCapacity(off + len);

for (int count, n = 0; n < len; n += count) {

if (0 > (count = mysqlInput.read(buffer, off + n, len - n))) {

// Reached end of input stream

return false;

}

}

if (limit < off + len) limit = off + len;

return true;

}

public void close() throws IOException {

try {

if (conn != null) conn.close();

conn = null;

mysqlInput = null;

mysqlOutput = null;

} catch (SQLException e) {

logger.warn("Unable to close connection", e);

}

}

//......

}

  • DirectLogFetcher继承了LogFetcher;其open方法先unwrapConnection,然后通过反射获取mysqlOutput、mysqlInput,然后执行sendBinlogDump;其close方法关闭connection;其fetch方法通过fetch0执行mysqlInput.read,读取到buffer中,如果还没有读完则返回true,读完了返回false

sendBinlogDump

canal-1.1.4/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcher.java

public final class DirectLogFetcher extends LogFetcher {

//......

protected final void sendBinlogDump(String fileName, final long filePosition, final int serverId,

boolean nonBlocking) throws IOException {

position = NET_HEADER_SIZE;

putByte(COM_BINLOG_DUMP);

putInt32(filePosition);

int binlog_flags = nonBlocking ? BINLOG_DUMP_NON_BLOCK : 0;

binlog_flags |= BINLOG_SEND_ANNOTATE_ROWS_EVENT;

putInt16(binlog_flags); // binlog_flags

putInt32(serverId); // slave"s server-id

putString(fileName);

final byte[] buf = buffer;

final int len = position - NET_HEADER_SIZE;

buf[0] = (byte) (len & 0xff);

buf[1] = (byte) (len >>> 8);

buf[2] = (byte) (len >>> 16);

mysqlOutput.write(buffer, 0, position);

mysqlOutput.flush();

}

//......

}

  • sendBinlogDump方法发送COM_BINLOG_DUMP命令,传递要读取的binlog名称,position,serverId以及binlog_flags(nonBlocking为BINLOG_DUMP_NON_BLOCK,否则为0)

DirectLogFetcherTest

canal-1.1.4/dbsync/src/test/java/com/taobao/tddl/dbsync/binlog/DirectLogFetcherTest.java

public class DirectLogFetcherTest extends BaseLogFetcherTest {

@Test

public void testSimple() {

DirectLogFetcher fecther = new DirectLogFetcher();

try {

Class.forName("com.mysql.jdbc.Driver");

Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306", "root", "hello");

Statement statement = connection.createStatement();

statement.execute("SET @master_binlog_checksum="@@global.binlog_checksum"");

statement.execute("SET @mariadb_slave_capability="" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + """);

fecther.open(connection, "mysql-bin.000007", 89797036L, 2);

LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);

LogContext context = new LogContext();

while (fecther.fetch()) {

LogEvent event = decoder.decode(fecther, context);

int eventType = event.getHeader().getType();

switch (eventType) {

case LogEvent.ROTATE_EVENT:

binlogFileName = ((RotateLogEvent) event).getFilename();

break;

case LogEvent.WRITE_ROWS_EVENT_V1:

case LogEvent.WRITE_ROWS_EVENT:

parseRowsEvent((WriteRowsLogEvent) event);

break;

case LogEvent.UPDATE_ROWS_EVENT_V1:

case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:

case LogEvent.UPDATE_ROWS_EVENT:

parseRowsEvent((UpdateRowsLogEvent) event);

break;

case LogEvent.DELETE_ROWS_EVENT_V1:

case LogEvent.DELETE_ROWS_EVENT:

parseRowsEvent((DeleteRowsLogEvent) event);

break;

case LogEvent.QUERY_EVENT:

parseQueryEvent((QueryLogEvent) event);

break;

case LogEvent.ROWS_QUERY_LOG_EVENT:

parseRowsQueryEvent((RowsQueryLogEvent) event);

break;

case LogEvent.ANNOTATE_ROWS_EVENT:

parseAnnotateRowsEvent((AnnotateRowsEvent) event);

break;

case LogEvent.XID_EVENT:

parseXidEvent((XidLogEvent) event);

break;

default:

break;

}

}

} catch (Exception e) {

e.printStackTrace();

Assert.fail(e.getMessage());

} finally {

try {

fecther.close();

} catch (IOException e) {

Assert.fail(e.getMessage());

}

}

}

}

  • DirectLogFetcherTest先建立connection,然后执行fecther.open,之后不断循环fecther.fetch(),直到其返回false,然后就是通过decoder.decode(fecther, context)解析binlog数据为LogEvent

小结

DirectLogFetcher继承了LogFetcher;其open方法先unwrapConnection,然后通过反射获取mysqlOutput、mysqlInput,然后执行sendBinlogDump;其close方法关闭connection;其fetch方法通过fetch0执行mysqlInput.read,读取到buffer中,如果还没有读完则返回true,读完了返回false

doc

  • DirectLogFetcher

以上是 聊聊canal的DirectLogFetcher 的全部内容, 来源链接: utcz.com/z/515770.html

回到顶部