聊聊CanalInstance
序
本文主要研究一下CanalInstance
CanalLifeCycle
canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/CanalLifeCycle.java
public interface CanalLifeCycle { void start();
void stop();
boolean isStart();
}
- CanalLifeCycle接口定义了start、stop、isStart方法
AbstractCanalLifeCycle
canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/AbstractCanalLifeCycle.java
public abstract class AbstractCanalLifeCycle implements CanalLifeCycle { protected volatile boolean running = false; // 是否处于运行中
public boolean isStart() {
return running;
}
public void start() {
if (running) {
throw new CanalException(this.getClass().getName() + " has startup , don"t repeat start");
}
running = true;
}
public void stop() {
if (!running) {
throw new CanalException(this.getClass().getName() + " isn"t start , please check");
}
running = false;
}
}
- AbstractCanalLifeCycle实现了CanalLifeCycle接口,其定义了running属性,start方法设置running为true,stop设置running为false,isStart返回running值
CanalInstance
canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstance.java
public interface CanalInstance extends CanalLifeCycle { String getDestination();
CanalEventParser getEventParser();
CanalEventSink getEventSink();
CanalEventStore getEventStore();
CanalMetaManager getMetaManager();
CanalAlarmHandler getAlarmHandler();
/**
* 客户端发生订阅/取消订阅行为
*/
boolean subscribeChange(ClientIdentity identity);
CanalMQConfig getMqConfig();
}
- CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法
AbstractCanalInstance
canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java
public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance { private static final Logger logger = LoggerFactory.getLogger(AbstractCanalInstance.class);
protected Long canalId; // 和manager交互唯一标示
protected String destination; // 队列名字
protected CanalEventStore<Event> eventStore; // 有序队列
protected CanalEventParser eventParser; // 解析对应的数据信息
protected CanalEventSink<List<CanalEntry.Entry>> eventSink; // 链接parse和store的桥接器
protected CanalMetaManager metaManager; // 消费信息管理器
protected CanalAlarmHandler alarmHandler; // alarm报警机制
protected CanalMQConfig mqConfig; // mq的配置
//......
@Override
public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();
}
if (!alarmHandler.isStart()) {
alarmHandler.start();
}
if (!eventStore.isStart()) {
eventStore.start();
}
if (!eventSink.isStart()) {
eventSink.start();
}
if (!eventParser.isStart()) {
beforeStartEventParser(eventParser);
eventParser.start();
afterStartEventParser(eventParser);
}
logger.info("start successful....");
}
@Override
public void stop() {
super.stop();
logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });
if (eventParser.isStart()) {
beforeStopEventParser(eventParser);
eventParser.stop();
afterStopEventParser(eventParser);
}
if (eventSink.isStart()) {
eventSink.stop();
}
if (eventStore.isStart()) {
eventStore.stop();
}
if (metaManager.isStart()) {
metaManager.stop();
}
if (alarmHandler.isStart()) {
alarmHandler.stop();
}
logger.info("stop successful....");
}
@Override
public boolean subscribeChange(ClientIdentity identity) {
if (StringUtils.isNotEmpty(identity.getFilter())) {
logger.info("subscribe filter change to " + identity.getFilter());
AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());
boolean isGroup = (eventParser instanceof GroupEventParser);
if (isGroup) {
// 处理group的模式
List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
if(singleEventParser instanceof AbstractEventParser) {
((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
}
}
} else {
if(eventParser instanceof AbstractEventParser) {
((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
}
}
}
// filter的处理规则
// a. parser处理数据过滤处理
// b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
// 后续内存版的一对多分发,可以考虑
return true;
}
@Override
public String getDestination() {
return destination;
}
@Override
public CanalEventParser getEventParser() {
return eventParser;
}
@Override
public CanalEventSink getEventSink() {
return eventSink;
}
@Override
public CanalEventStore getEventStore() {
return eventStore;
}
@Override
public CanalMetaManager getMetaManager() {
return metaManager;
}
@Override
public CanalAlarmHandler getAlarmHandler() {
return alarmHandler;
}
@Override
public CanalMQConfig getMqConfig() {
return mqConfig;
}
//......
}
- AbstractCanalInstance继承了AbstractCanalLifeCycle,它覆盖了start、stop方法,其start方法分别启动metaManager、alarmHandler、eventStore、eventSink、eventParser,其stop方法分别关闭eventParser、eventSink、eventStore、metaManager、alarmHandler;其subscribeChange方法根据ClientIdentity的pattern创建AviaterRegexFilter,然后设置给eventParser
CanalInstanceWithSpring
canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/CanalInstanceWithSpring.java
public class CanalInstanceWithSpring extends AbstractCanalInstance { private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);
public void start() {
logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });
super.start();
}
// ======== setter ========
public void setDestination(String destination) {
this.destination = destination;
}
public void setEventParser(CanalEventParser eventParser) {
this.eventParser = eventParser;
}
public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {
this.eventSink = eventSink;
}
public void setEventStore(CanalEventStore<Event> eventStore) {
this.eventStore = eventStore;
}
public void setMetaManager(CanalMetaManager metaManager) {
this.metaManager = metaManager;
}
public void setAlarmHandler(CanalAlarmHandler alarmHandler) {
this.alarmHandler = alarmHandler;
}
public void setMqConfig(CanalMQConfig mqConfig){
this.mqConfig = mqConfig;
}
}
- CanalInstanceWithSpring继承了AbstractCanalInstance,它专门给注册到spring容器使用
小结
CanalInstance继承了CanalLifeCycle,它还定义了getDestination、getEventParser、getEventSink、getEventStore、getMetaManager、getAlarmHandler、subscribeChange、getMqConfig方法
doc
- CanalInstance
以上是 聊聊CanalInstance 的全部内容, 来源链接: utcz.com/z/515449.html