聊聊canal的MysqlDetectingTimeTask
序
本文主要研究一下canal的MysqlDetectingTimeTask
MysqlDetectingTimeTask
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
class MysqlDetectingTimeTask extends TimerTask { private boolean reconnect = false;
private MysqlConnection mysqlConnection;
public MysqlDetectingTimeTask(MysqlConnection mysqlConnection){
this.mysqlConnection = mysqlConnection;
}
public void run() {
try {
if (reconnect) {
reconnect = false;
mysqlConnection.reconnect();
} else if (!mysqlConnection.isConnected()) {
mysqlConnection.connect();
}
Long startTime = System.currentTimeMillis();
// 可能心跳sql为select 1
if (StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "select")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "show")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "explain")
|| StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "desc")) {
mysqlConnection.query(detectingSQL);
} else {
mysqlConnection.update(detectingSQL);
}
Long costTime = System.currentTimeMillis() - startTime;
if (haController != null && haController instanceof HeartBeatCallback) {
((HeartBeatCallback) haController).onSuccess(costTime);
}
} catch (SocketTimeoutException e) {
if (haController != null && haController instanceof HeartBeatCallback) {
((HeartBeatCallback) haController).onFailed(e);
}
reconnect = true;
logger.warn("connect failed by ", e);
} catch (IOException e) {
if (haController != null && haController instanceof HeartBeatCallback) {
((HeartBeatCallback) haController).onFailed(e);
}
reconnect = true;
logger.warn("connect failed by ", e);
} catch (Throwable e) {
if (haController != null && haController instanceof HeartBeatCallback) {
((HeartBeatCallback) haController).onFailed(e);
}
reconnect = true;
logger.warn("connect failed by ", e);
}
}
public MysqlConnection getMysqlConnection() {
return mysqlConnection;
}
}
- MysqlDetectingTimeTask继承了TimerTask,其run方法在reconnect为true时,更新reconnect为false,然后执行mysqlConnection.reconnect();若reconnect为false且mysqlConnection.isConnected()为false则执行mysqlConnection.connect();之后执行detectingSQL语句,然后记录costTime,最后执行((HeartBeatCallback) haController).onSuccess(costTime);依次捕获SocketTimeoutException、IOException、Throwable,然后则执行((HeartBeatCallback) haController).onFailed(e),更新reconnect为true
HeartBeatCallback
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/HeartBeatCallback.java
public interface HeartBeatCallback { /**
* 心跳发送成功
*/
public void onSuccess(long costTime);
/**
* 心跳发送失败
*/
public void onFailed(Throwable e);
}
- HeartBeatCallback接口定义了onSuccess、onFailed方法
HeartBeatHAController
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/ha/HeartBeatHAController.java
public class HeartBeatHAController extends AbstractCanalLifeCycle implements CanalHAController, HeartBeatCallback { private static final Logger logger = LoggerFactory.getLogger(HeartBeatHAController.class);
// default 3 times
private int detectingRetryTimes = 3;
private int failedTimes = 0;
private boolean switchEnable = false;
private CanalHASwitchable eventParser;
public HeartBeatHAController(){
}
public void onSuccess(long costTime) {
failedTimes = 0;
}
public void onFailed(Throwable e) {
failedTimes++;
// 检查一下是否超过失败次数
synchronized (this) {
if (failedTimes > detectingRetryTimes) {
if (switchEnable) {
eventParser.doSwitch();// 通知执行一次切换
failedTimes = 0;
} else {
logger.warn("HeartBeat failed Times:{} , should auto switch ?", failedTimes);
}
}
}
}
// ============================= setter / getter
// ============================
public void setCanalHASwitchable(CanalHASwitchable canalHASwitchable) {
this.eventParser = canalHASwitchable;
}
public void setDetectingRetryTimes(int detectingRetryTimes) {
this.detectingRetryTimes = detectingRetryTimes;
}
public void setSwitchEnable(boolean switchEnable) {
this.switchEnable = switchEnable;
}
}
- HeartBeatHAController实现了HeartBeatCallback接口,其onSuccess方法更新failedTimes为0;其onFailed方法递增failedTimes,然后判断failedTimes是否大于detectingRetryTimes,且switchEnable为true则执行eventParser.doSwitch(),然后更新failedTimes为0
doSwitch
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java
public class MysqlEventParser extends AbstractMysqlEventParser implements CanalEventParser, CanalHASwitchable { //......
// 处理主备切换的逻辑
public void doSwitch() {
AuthenticationInfo newRunningInfo = (runningInfo.equals(masterInfo) ? standbyInfo : masterInfo);
this.doSwitch(newRunningInfo);
}
public void doSwitch(AuthenticationInfo newRunningInfo) {
// 1. 需要停止当前正在复制的过程
// 2. 找到新的position点
// 3. 重新建立链接,开始复制数据
// 切换ip
String alarmMessage = null;
if (this.runningInfo.equals(newRunningInfo)) {
alarmMessage = "same runingInfo switch again : " + runningInfo.getAddress().toString();
logger.warn(alarmMessage);
return;
}
if (newRunningInfo == null) {
alarmMessage = "no standby config, just do nothing, will continue try:"
+ runningInfo.getAddress().toString();
logger.warn(alarmMessage);
sendAlarm(destination, alarmMessage);
return;
} else {
stop();
alarmMessage = "try to ha switch, old:" + runningInfo.getAddress().toString() + ", new:"
+ newRunningInfo.getAddress().toString();
logger.warn(alarmMessage);
sendAlarm(destination, alarmMessage);
runningInfo = newRunningInfo;
start();
}
}
//......
}
- MysqlEventParser的doSwitch方法先执行stop方法,然后更新runningInfo,最后执行start方法
小结
MysqlDetectingTimeTask继承了TimerTask,其run方法在reconnect为true时,更新reconnect为false,然后执行mysqlConnection.reconnect();若reconnect为false且mysqlConnection.isConnected()为false则执行mysqlConnection.connect();之后执行detectingSQL语句,然后记录costTime,最后执行((HeartBeatCallback) haController).onSuccess(costTime);依次捕获SocketTimeoutException、IOException、Throwable,然后则执行((HeartBeatCallback) haController).onFailed(e),更新reconnect为true
doc
- MysqlDetectingTimeTask
以上是 聊聊canal的MysqlDetectingTimeTask 的全部内容, 来源链接: utcz.com/z/515691.html