聊聊canal的CanalAdapterService

编程

CanalAdapterService

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.java

@Component

@RefreshScope

public class CanalAdapterService {

private static final Logger logger = LoggerFactory.getLogger(CanalAdapterService.class);

private CanalAdapterLoader adapterLoader;

@Resource

private ContextRefresher contextRefresher;

@Resource

private AdapterCanalConfig adapterCanalConfig;

@Resource

private Environment env;

// 注入bean保证优先注册

@Resource

private SpringContext springContext;

@Resource

private SyncSwitch syncSwitch;

private volatile boolean running = false;

@PostConstruct

public synchronized void init() {

if (running) {

return;

}

try {

logger.info("## start the canal client adapters.");

adapterLoader = new CanalAdapterLoader(adapterCanalConfig);

adapterLoader.init();

running = true;

logger.info("## the canal client adapters are running now ......");

} catch (Exception e) {

logger.error("## something goes wrong when starting up the canal client adapters:", e);

}

}

@PreDestroy

public synchronized void destroy() {

if (!running) {

return;

}

try {

running = false;

logger.info("## stop the canal client adapters");

if (adapterLoader != null) {

adapterLoader.destroy();

adapterLoader = null;

}

for (DruidDataSource druidDataSource : DatasourceConfig.DATA_SOURCES.values()) {

try {

druidDataSource.close();

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

DatasourceConfig.DATA_SOURCES.clear();

} catch (Throwable e) {

logger.warn("## something goes wrong when stopping canal client adapters:", e);

} finally {

logger.info("## canal client adapters are down.");

}

}

}

  • CanalAdapterService提供了init及destroy方法;其init方法会使用adapterCanalConfig创建CanalAdapterLoader,然后执行adapterLoader.init();其destroy方法会执行adapterLoader.destroy(),然后遍历DatasourceConfig.DATA_SOURCES执行druidDataSource.close(),最后清空DatasourceConfig.DATA_SOURCES

CanalAdapterLoader

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

public class CanalAdapterLoader {

private static final Logger logger = LoggerFactory.getLogger(CanalAdapterLoader.class);

private CanalClientConfig canalClientConfig;

private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>();

private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>();

private ExtensionLoader<OuterAdapter> loader;

public CanalAdapterLoader(CanalClientConfig canalClientConfig){

this.canalClientConfig = canalClientConfig;

}

/**

* 初始化canal-client

*/

public void init() {

loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);

String canalServerHost = this.canalClientConfig.getCanalServerHost();

SocketAddress sa = null;

if (canalServerHost != null) {

String[] ipPort = canalServerHost.split(":");

sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));

}

String zkHosts = this.canalClientConfig.getZookeeperHosts();

if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) {

// 初始化canal-client的适配器

for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {

List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();

for (CanalClientConfig.Group connectorGroup : canalAdapter.getGroups()) {

List<OuterAdapter> canalOutConnectors = new ArrayList<>();

for (OuterAdapterConfig c : connectorGroup.getOuterAdapters()) {

loadAdapter(c, canalOutConnectors);

}

canalOuterAdapterGroups.add(canalOutConnectors);

}

CanalAdapterWorker worker;

if (sa != null) {

worker = new CanalAdapterWorker(canalClientConfig,

canalAdapter.getInstance(),

sa,

canalOuterAdapterGroups);

} else if (zkHosts != null) {

worker = new CanalAdapterWorker(canalClientConfig,

canalAdapter.getInstance(),

zkHosts,

canalOuterAdapterGroups);

} else {

throw new RuntimeException("No canal server connector found");

}

canalWorkers.put(canalAdapter.getInstance(), worker);

worker.start();

logger.info("Start adapter for canal instance: {} succeed", canalAdapter.getInstance());

}

} else if ("kafka".equalsIgnoreCase(canalClientConfig.getMode())) {

// 初始化canal-client-kafka的适配器

for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {

for (CanalClientConfig.Group group : canalAdapter.getGroups()) {

List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();

List<OuterAdapter> canalOuterAdapters = new ArrayList<>();

for (OuterAdapterConfig config : group.getOuterAdapters()) {

loadAdapter(config, canalOuterAdapters);

}

canalOuterAdapterGroups.add(canalOuterAdapters);

CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,

canalClientConfig.getMqServers(),

canalAdapter.getInstance(),

group.getGroupId(),

canalOuterAdapterGroups,

canalClientConfig.getFlatMessage());

canalMQWorker.put(canalAdapter.getInstance() + "-kafka-" + group.getGroupId(), canalKafkaWorker);

canalKafkaWorker.start();

logger.info("Start adapter for canal-client mq topic: {} succeed",

canalAdapter.getInstance() + "-" + group.getGroupId());

}

}

} else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) {

// 初始化canal-client-rocketMQ的适配器

for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {

for (CanalClientConfig.Group group : canalAdapter.getGroups()) {

List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();

List<OuterAdapter> canalOuterAdapters = new ArrayList<>();

for (OuterAdapterConfig config : group.getOuterAdapters()) {

loadAdapter(config, canalOuterAdapters);

}

canalOuterAdapterGroups.add(canalOuterAdapters);

CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,

canalClientConfig.getMqServers(),

canalAdapter.getInstance(),

group.getGroupId(),

canalOuterAdapterGroups,

canalClientConfig.getAccessKey(),

canalClientConfig.getSecretKey(),

canalClientConfig.getFlatMessage(),

canalClientConfig.isEnableMessageTrace(),

canalClientConfig.getCustomizedTraceTopic(),

canalClientConfig.getAccessChannel(),

canalClientConfig.getNamespace());

canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);

rocketMQWorker.start();

logger.info("Start adapter for canal-client mq topic: {} succeed",

canalAdapter.getInstance() + "-" + group.getGroupId());

}

}

}

}

private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) {

try {

OuterAdapter adapter;

adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey()));

ClassLoader cl = Thread.currentThread().getContextClassLoader();

// 替换ClassLoader

Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader());

Environment env = (Environment) SpringContext.getBean(Environment.class);

Properties evnProperties = null;

if (env instanceof StandardEnvironment) {

evnProperties = new Properties();

for (PropertySource<?> propertySource : ((StandardEnvironment) env).getPropertySources()) {

if (propertySource instanceof EnumerablePropertySource) {

String[] names = ((EnumerablePropertySource<?>) propertySource).getPropertyNames();

for (String name : names) {

Object val = propertySource.getProperty(name);

if (val != null) {

evnProperties.put(name, val);

}

}

}

}

}

adapter.init(config, evnProperties);

Thread.currentThread().setContextClassLoader(cl);

canalOutConnectors.add(adapter);

logger.info("Load canal adapter: {} succeed", config.getName());

} catch (Exception e) {

logger.error("Load canal adapter: {} failed", config.getName(), e);

}

}

/**

* 销毁所有适配器 为防止canal实例太多造成销毁阻塞, 并行销毁

*/

public void destroy() {

if (!canalWorkers.isEmpty()) {

ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size());

for (CanalAdapterWorker canalAdapterWorker : canalWorkers.values()) {

stopExecutorService.execute(canalAdapterWorker::stop);

}

stopExecutorService.shutdown();

try {

while (!stopExecutorService.awaitTermination(1, TimeUnit.SECONDS)) {

// ignore

}

} catch (InterruptedException e) {

// ignore

}

}

if (!canalMQWorker.isEmpty()) {

ExecutorService stopMQWorkerService = Executors.newFixedThreadPool(canalMQWorker.size());

for (AbstractCanalAdapterWorker canalAdapterMQWorker : canalMQWorker.values()) {

stopMQWorkerService.execute(canalAdapterMQWorker::stop);

}

stopMQWorkerService.shutdown();

try {

while (!stopMQWorkerService.awaitTermination(1, TimeUnit.SECONDS)) {

// ignore

}

} catch (InterruptedException e) {

// ignore

}

}

logger.info("All canal adapters destroyed");

}

}

  • CanalAdapterLoader的init方法通过ExtensionLoader.getExtensionLoader(OuterAdapter.class)获取OuterAdapter的loader,之后根据不同的canalClientConfig.getMode()进行不同的初始化,这里有tcp、kafka、rocketMQ三种模式;tcp模式创建CanalAdapterWorker,然后添加到canalWorkers之后执行worker.start方法;kafka模式创建CanalAdapterKafkaWorker,然后添加到canalMQWorker之后执行worker.start方法;rocketMQ创建CanalAdapterRocketMQWorker,然后添加到canalMQWorker之后执行worker.start方法;其destroy方法会遍历canalWorkers.values()及canalMQWorker.values(),提交异步线程执行其stop方法

小结

CanalAdapterService提供了init及destroy方法;其init方法会使用adapterCanalConfig创建CanalAdapterLoader,然后执行adapterLoader.init();其destroy方法会执行adapterLoader.destroy(),然后遍历DatasourceConfig.DATA_SOURCES执行druidDataSource.close(),最后清空DatasourceConfig.DATA_SOURCES

doc

  • CanalAdapterService

以上是 聊聊canal的CanalAdapterService 的全部内容, 来源链接: utcz.com/z/515099.html

回到顶部