聊聊canal的Position

编程

本文主要研究一下canal的Position

Position

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/Position.java

public abstract class Position implements Serializable {

private static final long serialVersionUID = 2332798099928474975L;

public String toString() {

return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);

}

}

  • Position定义了toString方法,使用ToStringBuilder.reflectionToString方法以CanalToStringStyle.DEFAULT_STYLE来转换

TimePosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/TimePosition.java

public class TimePosition extends Position {

private static final long serialVersionUID = 6185261261064226380L;

protected Long timestamp;

public TimePosition(Long timestamp){

this.timestamp = timestamp;

}

public Long getTimestamp() {

return timestamp;

}

public void setTimestamp(Long timestamp) {

this.timestamp = timestamp;

}

//......

}

  • TimePosition继承了Position,它定义了timestamp属性

EntryPosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/EntryPosition.java

public class EntryPosition extends TimePosition {

private static final long serialVersionUID = 81432665066427482L;

public static final int EVENTIDENTITY_SEGMENT = 3;

public static final char EVENTIDENTITY_SPLIT = (char) 5;

private boolean included = false;

private String journalName;

private Long position;

// add by agapple at 2016-06-28

private Long serverId = null; // 记录一下位点对应的serverId

private String gtid = null;

//......

}

  • EntryPosition继承了TimePosition,它定义了included、journalName、position、serverId、gtid属性

SlaveEntryPosition

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/SlaveEntryPosition.java

public class SlaveEntryPosition extends EntryPosition {

private static final long serialVersionUID = 5271424551446372093L;

private final String masterHost;

private final String masterPort;

public SlaveEntryPosition(String fileName, long position, String masterHost, String masterPort){

super(fileName, position);

this.masterHost = masterHost;

this.masterPort = masterPort;

}

public String getMasterHost() {

return masterHost;

}

public String getMasterPort() {

return masterPort;

}

}

  • SlaveEntryPosition继承了EntryPosition,它定义了masterHost、masterPort属性

LogPosition

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/position/LogPosition.java

public class LogPosition extends Position {

private static final long serialVersionUID = 3875012010277005819L;

private LogIdentity identity;

private EntryPosition postion;

public LogIdentity getIdentity() {

return identity;

}

public void setIdentity(LogIdentity identity) {

this.identity = identity;

}

public EntryPosition getPostion() {

return postion;

}

public void setPostion(EntryPosition postion) {

this.postion = postion;

}

//......

}

  • LogPosition继承了Position,它定义了identity、postion两个属性

cursor

canal-1.1.4/meta/src/main/java/com/alibaba/otter/canal/meta/CanalMetaManager.java

public interface CanalMetaManager extends CanalLifeCycle {

//......

/**

* 获取 cursor 游标

*/

Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;

/**

* 更新 cursor 游标

*/

void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException;

//......

}

  • CanalMetaManager定义了getCursor、updateCursor方法;其中getCursor方法根据clientIdentity返回Position;updateCursor方法则更新指定clientIdentity的position

MemoryMetaManager

canal-1.1.4/meta/src/main/java/com/alibaba/otter/canal/meta/MemoryMetaManager.java

public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {

protected Map<String, List<ClientIdentity>> destinations;

protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;

protected Map<ClientIdentity, Position> cursors;

//......

public Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException {

return cursors.get(clientIdentity);

}

public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {

cursors.put(clientIdentity, position);

}

//......

}

  • MemoryMetaManager实现了CanalMetaManager接口,它定义了Map结构的cursors,key为ClientIdentity,value为Position;getCursor方法则根据clientIdentity从cursors获取Position;updateCursor方法则更新cursors中key为clientIdentity的value为position

小结

Position定义了toString方法,使用ToStringBuilder.reflectionToString方法以CanalToStringStyle.DEFAULT_STYLE来转换;它有两个直接子类,分别是TimePosition、LogPosition;而EntryPosition继承了TimePosition;SlaveEntryPosition继承了EntryPosition;LogPosition则组合了EntryPosition

doc

  • Position

以上是 聊聊canal的Position 的全部内容, 来源链接: utcz.com/z/515615.html

回到顶部