聊聊canal的MysqlConnection

编程

本文主要研究一下canal的MysqlConnection

ErosaConnection

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

public interface ErosaConnection {

public void connect() throws IOException;

public void reconnect() throws IOException;

public void disconnect() throws IOException;

/**

* 用于快速数据查找,和dump的区别在于,seek会只给出部分的数据

*/

public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException;

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

public void dump(long timestamp, SinkFunction func) throws IOException;

/**

* 通过GTID同步binlog

*/

public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException;

// -------------

public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException;

public void dump(long timestamp, MultiStageCoprocessor coprocessor) throws IOException;

public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException;

ErosaConnection fork();

public long queryServerId() throws IOException;

}

  • ErosaConnection接口定义了connect、reconnect、disconnect、seek、dump、fork、queryServerId方法

MysqlConnection

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

public class MysqlConnection implements ErosaConnection {

private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

private MysqlConnector connector;

private long slaveId;

private Charset charset = Charset.forName("UTF-8");

private BinlogFormat binlogFormat;

private BinlogImage binlogImage;

// tsdb releated

private AuthenticationInfo authInfo;

protected int connTimeout = 5 * 1000; // 5秒

protected int soTimeout = 60 * 60 * 1000; // 1小时

private int binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;

// dump binlog bytes, 暂不包括meta与TSDB

private AtomicLong receivedBinlogBytes;

public MysqlConnection(){

}

public MysqlConnection(InetSocketAddress address, String username, String password){

authInfo = new AuthenticationInfo();

authInfo.setAddress(address);

authInfo.setUsername(username);

authInfo.setPassword(password);

connector = new MysqlConnector(address, username, password);

// 将connection里面的参数透传下

connector.setSoTimeout(soTimeout);

connector.setConnTimeout(connTimeout);

}

public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber,

String defaultSchema){

authInfo = new AuthenticationInfo();

authInfo.setAddress(address);

authInfo.setUsername(username);

authInfo.setPassword(password);

authInfo.setDefaultDatabaseName(defaultSchema);

connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);

// 将connection里面的参数透传下

connector.setSoTimeout(soTimeout);

connector.setConnTimeout(connTimeout);

}

public void connect() throws IOException {

connector.connect();

}

public void reconnect() throws IOException {

connector.reconnect();

}

public void disconnect() throws IOException {

connector.disconnect();

}

public boolean isConnected() {

return connector.isConnected();

}

public MysqlConnection fork() {

MysqlConnection connection = new MysqlConnection();

connection.setCharset(getCharset());

connection.setSlaveId(getSlaveId());

connection.setConnector(connector.fork());

// set authInfo

connection.setAuthInfo(authInfo);

return connection;

}

@Override

public long queryServerId() throws IOException {

ResultSetPacket resultSetPacket = query("show variables like "server_id"");

List<String> fieldValues = resultSetPacket.getFieldValues();

if (fieldValues == null || fieldValues.size() != 2) {

return 0;

}

return NumberUtils.toLong(fieldValues.get(1));

}

public ResultSetPacket query(String cmd) throws IOException {

MysqlQueryExecutor exector = new MysqlQueryExecutor(connector);

return exector.query(cmd);

}

//......

}

  • MysqlConnection实现了ErosaConnection接口,其构造器会构建AuthenticationInfo及MysqlConnector;其connect、reconnect、disconnect方法均直接委托给MysqlConnector;其fork方法会使用connector.fork()重新创建一个MysqlConnection;其queryServerId方法则使用show variables like "server_id"查询

seek

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

public class MysqlConnection implements ErosaConnection {

//......

public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException {

updateSettings();

loadBinlogChecksum();

sendBinlogDump(binlogfilename, binlogPosition);

DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());

fetcher.start(connector.getChannel());

LogDecoder decoder = new LogDecoder();

decoder.handle(LogEvent.ROTATE_EVENT);

decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);

decoder.handle(LogEvent.QUERY_EVENT);

decoder.handle(LogEvent.XID_EVENT);

LogContext context = new LogContext();

// 若entry position存在gtid,则使用传入的gtid作为gtidSet

// 拼接的标准,否则同时开启gtid和tsdb时,会导致丢失gtid

// 而当源端数据库gtid 有purged时会有如下类似报错

// "errno = 1236, sqlstate = HY000 errmsg = The slave is connecting

// using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...

if (StringUtils.isNotEmpty(gtid)) {

decoder.handle(LogEvent.GTID_LOG_EVENT);

context.setGtidSet(MysqlGTIDSet.parse(gtid));

}

context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));

while (fetcher.fetch()) {

accumulateReceivedBytes(fetcher.limit());

LogEvent event = null;

event = decoder.decode(fetcher, context);

if (event == null) {

throw new CanalParseException("parse failed");

}

if (!func.sink(event)) {

break;

}

}

}

private void updateSettings() throws IOException {

try {

update("set wait_timeout=9999999");

} catch (Exception e) {

logger.warn("update wait_timeout failed", e);

}

try {

update("set net_write_timeout=1800");

} catch (Exception e) {

logger.warn("update net_write_timeout failed", e);

}

try {

update("set net_read_timeout=1800");

} catch (Exception e) {

logger.warn("update net_read_timeout failed", e);

}

try {

// 设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化

update("set names "binary"");

} catch (Exception e) {

logger.warn("update names failed", e);

}

try {

// mysql5.6针对checksum支持需要设置session变量

// 如果不设置会出现错误: Slave can not handle replication events with the

// checksum that master is configured to log

// 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码

// "@@global.binlog_checksum"需要去掉单引号,在mysql 5.6.29下导致master退出

update("set @master_binlog_checksum= @@global.binlog_checksum");

} catch (Exception e) {

if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {

logger.warn("update master_binlog_checksum failed", e);

}

}

try {

// 参考:https://github.com/alibaba/canal/issues/284

// mysql5.6需要设置slave_uuid避免被server kill链接

update("set @slave_uuid=uuid()");

} catch (Exception e) {

if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {

logger.warn("update slave_uuid failed", e);

}

}

try {

// mariadb针对特殊的类型,需要设置session变量

update("SET @mariadb_slave_capability="" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + """);

} catch (Exception e) {

if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {

logger.warn("update mariadb_slave_capability failed", e);

}

}

/**

* MASTER_HEARTBEAT_PERIOD sets the interval in seconds between

* replication heartbeats. Whenever the master"s binary log is updated

* with an event, the waiting period for the next heartbeat is reset.

* interval is a decimal value having the range 0 to 4294967 seconds and

* a resolution in milliseconds; the smallest nonzero value is 0.001.

* Heartbeats are sent by the master only if there are no unsent events

* in the binary log file for a period longer than interval.

*/

try {

long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);

update("SET @master_heartbeat_period=" + periodNano);

} catch (Exception e) {

logger.warn("update master_heartbeat_period failed", e);

}

}

private void loadBinlogChecksum() {

ResultSetPacket rs = null;

try {

rs = query("select @@global.binlog_checksum");

List<String> columnValues = rs.getFieldValues();

if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null

&& columnValues.get(0).toUpperCase().equals("CRC32")) {

binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;

} else {

binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;

}

} catch (Throwable e) {

// logger.error("", e);

binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;

}

}

private void sendBinlogDump(String binlogfilename, Long binlogPosition) throws IOException {

BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();

binlogDumpCmd.binlogFileName = binlogfilename;

binlogDumpCmd.binlogPosition = binlogPosition;

binlogDumpCmd.slaveServerId = this.slaveId;

byte[] cmdBody = binlogDumpCmd.toBytes();

logger.info("COM_BINLOG_DUMP with position:{}", binlogDumpCmd);

HeaderPacket binlogDumpHeader = new HeaderPacket();

binlogDumpHeader.setPacketBodyLength(cmdBody.length);

binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);

PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);

connector.setDumping(true);

}

//......

}

  • seek方法先执行updateSettings、loadBinlogChecksum、sendBinlogDump方法,然后创建DirectLogFetcher来fetch数据;updateSettings方法会设置wait_timeout、net_write_timeout、net_read_timeout、master_heartbeat_period等参数;loadBinlogChecksum方法会校验master的binlogChecksum;sendBinlogDump方法则是发送BinlogDumpCommandPacket

dump

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

public class MysqlConnection implements ErosaConnection {

//......

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {

updateSettings();

loadBinlogChecksum();

sendRegisterSlave();

sendBinlogDump(binlogfilename, binlogPosition);

DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());

fetcher.start(connector.getChannel());

LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);

LogContext context = new LogContext();

context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));

while (fetcher.fetch()) {

accumulateReceivedBytes(fetcher.limit());

LogEvent event = null;

event = decoder.decode(fetcher, context);

if (event == null) {

throw new CanalParseException("parse failed");

}

if (!func.sink(event)) {

break;

}

if (event.getSemival() == 1) {

sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());

}

}

}

public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {

updateSettings();

loadBinlogChecksum();

sendBinlogDumpGTID(gtidSet);

DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());

try {

fetcher.start(connector.getChannel());

LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);

LogContext context = new LogContext();

context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));

// fix bug: #890 将gtid传输至context中,供decode使用

context.setGtidSet(gtidSet);

while (fetcher.fetch()) {

accumulateReceivedBytes(fetcher.limit());

LogEvent event = null;

event = decoder.decode(fetcher, context);

if (event == null) {

throw new CanalParseException("parse failed");

}

if (!func.sink(event)) {

break;

}

}

} finally {

fetcher.close();

}

}

public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {

updateSettings();

loadBinlogChecksum();

sendRegisterSlave();

sendBinlogDump(binlogfilename, binlogPosition);

((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);

((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);

DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());

try {

fetcher.start(connector.getChannel());

while (fetcher.fetch()) {

accumulateReceivedBytes(fetcher.limit());

LogBuffer buffer = fetcher.duplicate();

fetcher.consume(fetcher.limit());

if (!coprocessor.publish(buffer)) {

break;

}

}

} finally {

fetcher.close();

}

}

public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException {

updateSettings();

loadBinlogChecksum();

sendBinlogDumpGTID(gtidSet);

((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);

((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);

DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());

try {

fetcher.start(connector.getChannel());

while (fetcher.fetch()) {

accumulateReceivedBytes(fetcher.limit());

LogBuffer buffer = fetcher.duplicate();

fetcher.consume(fetcher.limit());

if (!coprocessor.publish(buffer)) {

break;

}

}

} finally {

fetcher.close();

}

}

private void sendRegisterSlave() throws IOException {

RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();

SocketAddress socketAddress = connector.getChannel().getLocalSocketAddress();

if (socketAddress == null || !(socketAddress instanceof InetSocketAddress)) {

return;

}

InetSocketAddress address = (InetSocketAddress) socketAddress;

String host = address.getHostString();

int port = address.getPort();

cmd.reportHost = host;

cmd.reportPort = port;

cmd.reportPasswd = authInfo.getPassword();

cmd.reportUser = authInfo.getUsername();

cmd.serverId = this.slaveId;

byte[] cmdBody = cmd.toBytes();

logger.info("Register slave {}", cmd);

HeaderPacket header = new HeaderPacket();

header.setPacketBodyLength(cmdBody.length);

header.setPacketSequenceNumber((byte) 0x00);

PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);

header = PacketManager.readHeader(connector.getChannel(), 4);

byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength());

assert body != null;

if (body[0] < 0) {

if (body[0] == -1) {

ErrorPacket err = new ErrorPacket();

err.fromBytes(body);

throw new IOException("Error When doing Register slave:" + err.toString());

} else {

throw new IOException("unpexpected packet with field_count=" + body[0]);

}

}

}

private void sendBinlogDumpGTID(GTIDSet gtidSet) throws IOException {

BinlogDumpGTIDCommandPacket binlogDumpCmd = new BinlogDumpGTIDCommandPacket();

binlogDumpCmd.slaveServerId = this.slaveId;

binlogDumpCmd.gtidSet = gtidSet;

byte[] cmdBody = binlogDumpCmd.toBytes();

logger.info("COM_BINLOG_DUMP_GTID:{}", binlogDumpCmd);

HeaderPacket binlogDumpHeader = new HeaderPacket();

binlogDumpHeader.setPacketBodyLength(cmdBody.length);

binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);

PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);

connector.setDumping(true);

}

//......

}

  • MysqlConnection提供了多个dump方法,主要分为根据binlog文件名的,根据gtidSet的,以及使用SinkFunction或者MultiStageCoprocessor的
  • 根据binlogfilename的dump方法先执行updateSettings、loadBinlogChecksum、sendRegisterSlave、sendBinlogDump,然后创建DirectLogFetcher拉取数据,触发SinkFunction或者MultiStageCoprocessor;根据gtidSet的dump方法先执行updateSettings、loadBinlogChecksum、sendBinlogDumpGTID,然后创建DirectLogFetcher拉取数据,触发SinkFunction或者MultiStageCoprocessor
  • sendRegisterSlave方法构造并发送RegisterSlaveCommandPacket;sendBinlogDumpGTID方法则构造并发送BinlogDumpGTIDCommandPacket

小结

MysqlConnection实现了ErosaConnection接口,其构造器会构建AuthenticationInfo及MysqlConnector;其connect、reconnect、disconnect方法均直接委托给MysqlConnector;其fork方法会使用connector.fork()重新创建一个MysqlConnection;其queryServerId方法则使用show variables like "server_id"查询;它提供了多个dump方法,主要分为根据binlog文件名的,根据gtidSet的,以及使用SinkFunction或者MultiStageCoprocessor的

doc

  • MysqlConnection

以上是 聊聊canal的MysqlConnection 的全部内容, 来源链接: utcz.com/z/515858.html

回到顶部