聊聊canal的MysqlDetectingTimeTask

编程

本文主要研究一下canal的MysqlDetectingTimeTask

MysqlDetectingTimeTask

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

    class MysqlDetectingTimeTask extends TimerTask {

private boolean reconnect = false;

private MysqlConnection mysqlConnection;

public MysqlDetectingTimeTask(MysqlConnection mysqlConnection){

this.mysqlConnection = mysqlConnection;

}

public void run() {

try {

if (reconnect) {

reconnect = false;

mysqlConnection.reconnect();

} else if (!mysqlConnection.isConnected()) {

mysqlConnection.connect();

}

Long startTime = System.currentTimeMillis();

// 可能心跳sql为select 1

if (StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "select")

|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "show")

|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "explain")

|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "desc")) {

mysqlConnection.query(detectingSQL);

} else {

mysqlConnection.update(detectingSQL);

}

Long costTime = System.currentTimeMillis() - startTime;

if (haController != null && haController instanceof HeartBeatCallback) {

((HeartBeatCallback) haController).onSuccess(costTime);

}

} catch (SocketTimeoutException e) {

if (haController != null && haController instanceof HeartBeatCallback) {

((HeartBeatCallback) haController).onFailed(e);

}

reconnect = true;

logger.warn("connect failed by ", e);

} catch (IOException e) {

if (haController != null && haController instanceof HeartBeatCallback) {

((HeartBeatCallback) haController).onFailed(e);

}

reconnect = true;

logger.warn("connect failed by ", e);

} catch (Throwable e) {

if (haController != null && haController instanceof HeartBeatCallback) {

((HeartBeatCallback) haController).onFailed(e);

}

reconnect = true;

logger.warn("connect failed by ", e);

}

}

public MysqlConnection getMysqlConnection() {

return mysqlConnection;

}

}

  • MysqlDetectingTimeTask继承了TimerTask,其run方法在reconnect为true时,更新reconnect为false,然后执行mysqlConnection.reconnect();若reconnect为false且mysqlConnection.isConnected()为false则执行mysqlConnection.connect();之后执行detectingSQL语句,然后记录costTime,最后执行((HeartBeatCallback) haController).onSuccess(costTime);依次捕获SocketTimeoutException、IOException、Throwable,然后则执行((HeartBeatCallback) haController).onFailed(e),更新reconnect为true

HeartBeatCallback

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/HeartBeatCallback.java

public interface HeartBeatCallback {

/**

* 心跳发送成功

*/

public void onSuccess(long costTime);

/**

* 心跳发送失败

*/

public void onFailed(Throwable e);

}

  • HeartBeatCallback接口定义了onSuccess、onFailed方法

HeartBeatHAController

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/ha/HeartBeatHAController.java

public class HeartBeatHAController extends AbstractCanalLifeCycle implements CanalHAController, HeartBeatCallback {

private static final Logger logger = LoggerFactory.getLogger(HeartBeatHAController.class);

// default 3 times

private int detectingRetryTimes = 3;

private int failedTimes = 0;

private boolean switchEnable = false;

private CanalHASwitchable eventParser;

public HeartBeatHAController(){

}

public void onSuccess(long costTime) {

failedTimes = 0;

}

public void onFailed(Throwable e) {

failedTimes++;

// 检查一下是否超过失败次数

synchronized (this) {

if (failedTimes > detectingRetryTimes) {

if (switchEnable) {

eventParser.doSwitch();// 通知执行一次切换

failedTimes = 0;

} else {

logger.warn("HeartBeat failed Times:{} , should auto switch ?", failedTimes);

}

}

}

}

// ============================= setter / getter

// ============================

public void setCanalHASwitchable(CanalHASwitchable canalHASwitchable) {

this.eventParser = canalHASwitchable;

}

public void setDetectingRetryTimes(int detectingRetryTimes) {

this.detectingRetryTimes = detectingRetryTimes;

}

public void setSwitchEnable(boolean switchEnable) {

this.switchEnable = switchEnable;

}

}

  • HeartBeatHAController实现了HeartBeatCallback接口,其onSuccess方法更新failedTimes为0;其onFailed方法递增failedTimes,然后判断failedTimes是否大于detectingRetryTimes,且switchEnable为true则执行eventParser.doSwitch(),然后更新failedTimes为0

doSwitch

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

public class MysqlEventParser extends AbstractMysqlEventParser implements CanalEventParser, CanalHASwitchable {

//......

// 处理主备切换的逻辑

public void doSwitch() {

AuthenticationInfo newRunningInfo = (runningInfo.equals(masterInfo) ? standbyInfo : masterInfo);

this.doSwitch(newRunningInfo);

}

public void doSwitch(AuthenticationInfo newRunningInfo) {

// 1. 需要停止当前正在复制的过程

// 2. 找到新的position点

// 3. 重新建立链接,开始复制数据

// 切换ip

String alarmMessage = null;

if (this.runningInfo.equals(newRunningInfo)) {

alarmMessage = "same runingInfo switch again : " + runningInfo.getAddress().toString();

logger.warn(alarmMessage);

return;

}

if (newRunningInfo == null) {

alarmMessage = "no standby config, just do nothing, will continue try:"

+ runningInfo.getAddress().toString();

logger.warn(alarmMessage);

sendAlarm(destination, alarmMessage);

return;

} else {

stop();

alarmMessage = "try to ha switch, old:" + runningInfo.getAddress().toString() + ", new:"

+ newRunningInfo.getAddress().toString();

logger.warn(alarmMessage);

sendAlarm(destination, alarmMessage);

runningInfo = newRunningInfo;

start();

}

}

//......

}

  • MysqlEventParser的doSwitch方法先执行stop方法,然后更新runningInfo,最后执行start方法

小结

MysqlDetectingTimeTask继承了TimerTask,其run方法在reconnect为true时,更新reconnect为false,然后执行mysqlConnection.reconnect();若reconnect为false且mysqlConnection.isConnected()为false则执行mysqlConnection.connect();之后执行detectingSQL语句,然后记录costTime,最后执行((HeartBeatCallback) haController).onSuccess(costTime);依次捕获SocketTimeoutException、IOException、Throwable,然后则执行((HeartBeatCallback) haController).onFailed(e),更新reconnect为true

doc

  • MysqlDetectingTimeTask

以上是 聊聊canal的MysqlDetectingTimeTask 的全部内容, 来源链接: utcz.com/z/515691.html

回到顶部