聊聊canal的ClientIdentity

编程

本文主要研究一下canal的ClientIdentity

ClientIdentity

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/ClientIdentity.java

public class ClientIdentity implements Serializable {

private static final long serialVersionUID = -8262100681930834834L;

private String destination;

private short clientId;

private String filter;

public ClientIdentity(){

}

public ClientIdentity(String destination, short clientId){

this.clientId = clientId;

this.destination = destination;

}

public ClientIdentity(String destination, short clientId, String filter){

this.clientId = clientId;

this.destination = destination;

this.filter = filter;

}

public Boolean hasFilter() {

if (filter == null) {

return false;

}

return StringUtils.isNotBlank(filter);

}

//......

}

  • ClientIdentity定义了destination、clientId、filter属性

CanalServerWithEmbedded

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {

private static final Logger logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);

private Map<String, CanalInstance> canalInstances;

// private Map<ClientIdentity, Position> lastRollbackPostions;

private CanalInstanceGenerator canalInstanceGenerator;

private int metricsPort;

private CanalMetricsService metrics = NopCanalMetricsService.NOP;

private String user;

private String passwd;

//......

@Override

public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {

checkStart(clientIdentity.getDestination());

CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());

if (!canalInstance.getMetaManager().isStart()) {

canalInstance.getMetaManager().start();

}

canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅

Position position = canalInstance.getMetaManager().getCursor(clientIdentity);

if (position == null) {

position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条

if (position != null) {

canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor

}

logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);

} else {

logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);

}

// 通知下订阅关系变化

canalInstance.subscribeChange(clientIdentity);

}

/**

* 取消订阅

*/

@Override

public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {

CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());

canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅

logger.info("unsubscribe successfully, {}", clientIdentity);

}

/**

* 查询所有的订阅信息

*/

public List<ClientIdentity> listAllSubscribe(String destination) throws CanalServerException {

CanalInstance canalInstance = canalInstances.get(destination);

return canalInstance.getMetaManager().listAllSubscribeInfo(destination);

}

//......

}

  • CanalServerWithEmbedded提供了subscribe、unsubscribe、listAllSubscribe方法;其subscribe方法接收clientIdentity参数,然后使用canalInstance.getMetaManager().getCursor(clientIdentity)获取position,若position为null则使用canalInstance.getEventStore().getFirstPosition()获取,然后通过canalInstance.getMetaManager().updateCursor(clientIdentity, position)更新cursor,最后执行canalInstance.subscribeChange(clientIdentity);unsubscribe方法则执行canalInstance.getMetaManager().unsubscribe(clientIdentity);listAllSubscribe方法则执行canalInstance.getMetaManager().listAllSubscribeInfo(destination)

小结

ClientIdentity定义了destination、clientId、filter属性;CanalServerWithEmbedded提供了subscribe、unsubscribe、listAllSubscribe方法;其中subscribe、unsubscribe方法接收clientIdentity参数,而listAllSubscribe方法返回ClientIdentity列表

doc

  • ClientIdentity

以上是 聊聊canal的ClientIdentity 的全部内容, 来源链接: utcz.com/z/515607.html

回到顶部