聊聊CanalInstanceGenerator
序
本文主要研究一下CanalInstanceGenerator
CanalInstanceGenerator
canal-1.1.4/instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalInstanceGenerator.java
public interface CanalInstanceGenerator { /**
* 通过 destination 产生特定的 {@link CanalInstance}
*
* @param destination
* @return
*/
CanalInstance generate(String destination);
}
- CanalInstanceGenerator定义了generate方法用于创建指定destination的CanalInstance
PlainCanalInstanceGenerator
canal-1.1.4/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/PlainCanalInstanceGenerator.java
public class PlainCanalInstanceGenerator implements CanalInstanceGenerator { private static final Logger logger = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);
private String springXml;
private PlainCanalConfigClient canalConfigClient;
private String defaultName = "instance";
private BeanFactory beanFactory;
private Properties canalConfig;
public PlainCanalInstanceGenerator(Properties canalConfig){
this.canalConfig = canalConfig;
}
public CanalInstance generate(String destination) {
synchronized (CanalInstanceGenerator.class) {
try {
PlainCanal canal = canalConfigClient.findInstance(destination, null);
if (canal == null) {
throw new CanalException("instance : " + destination + " config is not found");
}
Properties properties = canal.getProperties();
// merge local
properties.putAll(canalConfig);
// 设置动态properties,替换掉本地properties
com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.set(properties);
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty("canal.instance.destination", destination);
this.beanFactory = getBeanFactory(springXml);
String beanName = destination;
if (!beanFactory.containsBean(beanName)) {
beanName = defaultName;
}
return (CanalInstance) beanFactory.getBean(beanName);
} catch (Throwable e) {
logger.error("generator instance failed.", e);
throw new CanalException(e);
} finally {
System.setProperty("canal.instance.destination", "");
}
}
}
// ================ setter / getter ================
private BeanFactory getBeanFactory(String springXml) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
return applicationContext;
}
public void setCanalConfigClient(PlainCanalConfigClient canalConfigClient) {
this.canalConfigClient = canalConfigClient;
}
public void setSpringXml(String springXml) {
this.springXml = springXml;
}
}
- PlainCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过canal.getProperties()设置相关属性,然后再通过beanFactory.getBean(beanName)获取CanalInstance
ManagerCanalInstanceGenerator
canal-1.1.4/instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/ManagerCanalInstanceGenerator.java
public class ManagerCanalInstanceGenerator implements CanalInstanceGenerator { private CanalConfigClient canalConfigClient;
public CanalInstance generate(String destination) {
Canal canal = canalConfigClient.findCanal(destination);
String filter = canalConfigClient.findFilter(destination);
return new CanalInstanceWithManager(canal, filter);
}
// ================ setter / getter ================
public void setCanalConfigClient(CanalConfigClient canalConfigClient) {
this.canalConfigClient = canalConfigClient;
}
}
- ManagerCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过canalConfigClient获取canal及filter,然后创建CanalInstanceWithManager
SpringCanalInstanceGenerator
canal-1.1.4/instance/spring/src/main/java/com/alibaba/otter/canal/instance/spring/SpringCanalInstanceGenerator.java
public class SpringCanalInstanceGenerator implements CanalInstanceGenerator { private static final Logger logger = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);
private String springXml;
private String defaultName = "instance";
private BeanFactory beanFactory;
public CanalInstance generate(String destination) {
synchronized (CanalInstanceGenerator.class) {
try {
// 设置当前正在加载的通道,加载spring查找文件时会用到该变量
System.setProperty("canal.instance.destination", destination);
this.beanFactory = getBeanFactory(springXml);
String beanName = destination;
if (!beanFactory.containsBean(beanName)) {
beanName = defaultName;
}
return (CanalInstance) beanFactory.getBean(beanName);
} catch (Throwable e) {
logger.error("generator instance failed.", e);
throw new CanalException(e);
} finally {
System.setProperty("canal.instance.destination", "");
}
}
}
private BeanFactory getBeanFactory(String springXml) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(springXml);
return applicationContext;
}
public void setSpringXml(String springXml) {
this.springXml = springXml;
}
}
- SpringCanalInstanceGenerator实现了CanalInstanceGenerator接口,其generate方法通过beanFactory.getBean(beanName)获取CanalInstance
小结
CanalInstanceGenerator定义了generate方法用于创建指定destination的CanalInstance;它有三个实现类分别是PlainCanalInstanceGenerator、ManagerCanalInstanceGenerator、SpringCanalInstanceGenerator
doc
- CanalInstanceGenerator
以上是 聊聊CanalInstanceGenerator 的全部内容, 来源链接: utcz.com/z/515561.html