聊聊CanalInstanceGenerator

编程

本文主要研究一下CanalInstanceGenerator

CanalInstanceGenerator

canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstanceGenerator.java

public interface CanalInstanceGenerator {

/**

* 通过 destination 产生特定的 {@link CanalInstance}

*

* @param destination

* @return

*/

CanalInstance generate(String destination);

}

  • CanalInstanceGenerator定义了generate方法用于创建指定destination的CanalInstance

PlainCanalInstanceGenerator

canal-1.1.4/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/PlainCanalInstanceGenerator.java

public class PlainCanalInstanceGenerator implements CanalInstanceGenerator {

private static final Logger logger = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);

private String springXml;

private PlainCanalConfigClient canalConfigClient;

private String defaultName = "instance";

private BeanFactory beanFactory;

private Properties canalConfig;

public PlainCanalInstanceGenerator(Properties canalConfig){

this.canalConfig = canalConfig;

}

public CanalInstance generate(String destination) {

synchronized (CanalInstanceGenerator.class) {

try {

PlainCanal canal = canalConfigClient.findInstance(destination, null);

if (canal == null) {

throw new CanalException("instance : " + destination + " config is not found");

}

Properties properties = canal.getProperties();

// merge local

properties.putAll(canalConfig);

// 设置动态properties,替换掉本地properties

com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.set(properties);

// 设置当前正在加载的通道,加载spring查找文件时会用到该变量

System.setProperty("canal.instance.destination", destination);

this.beanFactory = getBeanFactory(springXml);

String beanName = destination;

if (!beanFactory.containsBean(beanName)) {

beanName = defaultName;

}

return (CanalInstance) beanFactory.getBean(beanName);

} catch (Throwable e) {

logger.error("generator instance failed.", e);

throw new CanalException(e);

} finally {

System.setProperty("canal.instance.destination", "");

}

}

}

// ================ setter / getter ================

private BeanFactory getBeanFactory(String springXml) {

ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);

return applicationContext;

}

public void setCanalConfigClient(PlainCanalConfigClient canalConfigClient) {

this.canalConfigClient = canalConfigClient;

}

public void setSpringXml(String springXml) {

this.springXml = springXml;

}

}

  • PlainCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过canal.getProperties()设置相关属性,然后再通过beanFactory.getBean(beanName)获取CanalInstance

ManagerCanalInstanceGenerator

canal-1.1.4/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/ManagerCanalInstanceGenerator.java

public class ManagerCanalInstanceGenerator implements CanalInstanceGenerator {

private CanalConfigClient canalConfigClient;

public CanalInstance generate(String destination) {

Canal canal = canalConfigClient.findCanal(destination);

String filter = canalConfigClient.findFilter(destination);

return new CanalInstanceWithManager(canal, filter);

}

// ================ setter / getter ================

public void setCanalConfigClient(CanalConfigClient canalConfigClient) {

this.canalConfigClient = canalConfigClient;

}

}

  • ManagerCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过canalConfigClient获取canal及filter,然后创建CanalInstanceWithManager

SpringCanalInstanceGenerator

canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/SpringCanalInstanceGenerator.java

public class SpringCanalInstanceGenerator implements CanalInstanceGenerator {

private static final Logger logger = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);

private String springXml;

private String defaultName = "instance";

private BeanFactory beanFactory;

public CanalInstance generate(String destination) {

synchronized (CanalInstanceGenerator.class) {

try {

// 设置当前正在加载的通道,加载spring查找文件时会用到该变量

System.setProperty("canal.instance.destination", destination);

this.beanFactory = getBeanFactory(springXml);

String beanName = destination;

if (!beanFactory.containsBean(beanName)) {

beanName = defaultName;

}

return (CanalInstance) beanFactory.getBean(beanName);

} catch (Throwable e) {

logger.error("generator instance failed.", e);

throw new CanalException(e);

} finally {

System.setProperty("canal.instance.destination", "");

}

}

}

private BeanFactory getBeanFactory(String springXml) {

ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);

return applicationContext;

}

public void setSpringXml(String springXml) {

this.springXml = springXml;

}

}

  • SpringCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过beanFactory.getBean(beanName)获取CanalInstance

小结

CanalInstanceGenerator定义了generate方法用于创建指定destination的CanalInstance;它有三个实现类分别是PlainCanalInstanceGenerator、ManagerCanalInstanceGenerator、SpringCanalInstanceGenerator

doc

  • CanalInstanceGenerator

以上是 聊聊CanalInstanceGenerator 的全部内容, 来源链接: utcz.com/z/515561.html

回到顶部