聊聊CanalInstance

编程

本文主要研究一下CanalInstance

CanalLifeCycle

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/CanalLifeCycle.java

public interface CanalLifeCycle {

void start();

void stop();

boolean isStart();

}

  • CanalLifeCycle接口定义了start、stop、isStart方法

AbstractCanalLifeCycle

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/AbstractCanalLifeCycle.java

public abstract class AbstractCanalLifeCycle implements CanalLifeCycle {

protected volatile boolean running = false; // 是否处于运行中

public boolean isStart() {

return running;

}

public void start() {

if (running) {

throw new CanalException(this.getClass().getName() + " has startup , don"t repeat start");

}

running = true;

}

public void stop() {

if (!running) {

throw new CanalException(this.getClass().getName() + " isn"t start , please check");

}

running = false;

}

}

  • AbstractCanalLifeCycle实现了CanalLifeCycle接口,其定义了running属性,start方法设置running为true,stop设置running为false,isStart返回running值

CanalInstance

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.java

public interface CanalInstance extends CanalLifeCycle {

String getDestination();

CanalEventParser getEventParser();

CanalEventSink getEventSink();

CanalEventStore getEventStore();

CanalMetaManager getMetaManager();

CanalAlarmHandler getAlarmHandler();

/**

* 客户端发生订阅/取消订阅行为

*/

boolean subscribeChange(ClientIdentity identity);

CanalMQConfig getMqConfig();

}

  • CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法

AbstractCanalInstance

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {

private static final Logger logger = LoggerFactory.getLogger(AbstractCanalInstance.class);

protected Long canalId; // 和manager交互唯一标示

protected String destination; // 队列名字

protected CanalEventStore<Event> eventStore; // 有序队列

protected CanalEventParser eventParser; // 解析对应的数据信息

protected CanalEventSink<List<CanalEntry.Entry>> eventSink; // 链接parse和store的桥接器

protected CanalMetaManager metaManager; // 消费信息管理器

protected CanalAlarmHandler alarmHandler; // alarm报警机制

protected CanalMQConfig mqConfig; // mq的配置

//......

@Override

public void start() {

super.start();

if (!metaManager.isStart()) {

metaManager.start();

}

if (!alarmHandler.isStart()) {

alarmHandler.start();

}

if (!eventStore.isStart()) {

eventStore.start();

}

if (!eventSink.isStart()) {

eventSink.start();

}

if (!eventParser.isStart()) {

beforeStartEventParser(eventParser);

eventParser.start();

afterStartEventParser(eventParser);

}

logger.info("start successful....");

}

@Override

public void stop() {

super.stop();

logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });

if (eventParser.isStart()) {

beforeStopEventParser(eventParser);

eventParser.stop();

afterStopEventParser(eventParser);

}

if (eventSink.isStart()) {

eventSink.stop();

}

if (eventStore.isStart()) {

eventStore.stop();

}

if (metaManager.isStart()) {

metaManager.stop();

}

if (alarmHandler.isStart()) {

alarmHandler.stop();

}

logger.info("stop successful....");

}

@Override

public boolean subscribeChange(ClientIdentity identity) {

if (StringUtils.isNotEmpty(identity.getFilter())) {

logger.info("subscribe filter change to " + identity.getFilter());

AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

boolean isGroup = (eventParser instanceof GroupEventParser);

if (isGroup) {

// 处理group的模式

List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();

for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动

if(singleEventParser instanceof AbstractEventParser) {

((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);

}

}

} else {

if(eventParser instanceof AbstractEventParser) {

((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);

}

}

}

// filter的处理规则

// a. parser处理数据过滤处理

// b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据

// 后续内存版的一对多分发,可以考虑

return true;

}

@Override

public String getDestination() {

return destination;

}

@Override

public CanalEventParser getEventParser() {

return eventParser;

}

@Override

public CanalEventSink getEventSink() {

return eventSink;

}

@Override

public CanalEventStore getEventStore() {

return eventStore;

}

@Override

public CanalMetaManager getMetaManager() {

return metaManager;

}

@Override

public CanalAlarmHandler getAlarmHandler() {

return alarmHandler;

}

@Override

public CanalMQConfig getMqConfig() {

return mqConfig;

}

//......

}

  • AbstractCanalInstance继承了AbstractCanalLifeCycle,它覆盖了start、stop方法,其start方法分别启动metaManager、alarmHandler、eventStore、eventSink、eventParser,其stop方法分别关闭eventParser、eventSink、eventStore、metaManager、alarmHandler;其subscribeChange方法根据ClientIdentity的pattern创建AviaterRegexFilter,然后设置给eventParser

CanalInstanceWithSpring

canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/CanalInstanceWithSpring.java

public class CanalInstanceWithSpring extends AbstractCanalInstance {

private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);

public void start() {

logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });

super.start();

}

// ======== setter ========

public void setDestination(String destination) {

this.destination = destination;

}

public void setEventParser(CanalEventParser eventParser) {

this.eventParser = eventParser;

}

public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {

this.eventSink = eventSink;

}

public void setEventStore(CanalEventStore<Event> eventStore) {

this.eventStore = eventStore;

}

public void setMetaManager(CanalMetaManager metaManager) {

this.metaManager = metaManager;

}

public void setAlarmHandler(CanalAlarmHandler alarmHandler) {

this.alarmHandler = alarmHandler;

}

public void setMqConfig(CanalMQConfig mqConfig){

this.mqConfig = mqConfig;

}

}

  • CanalInstanceWithSpring继承了AbstractCanalInstance,它专门给注册到spring容器使用

小结

CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法

doc

  • CanalInstance

以上是 聊聊CanalInstance 的全部内容, 来源链接: utcz.com/z/515449.html

回到顶部