聊聊canal的CanalAdapterWorker

编程

CanalAdapterWorker

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

public class CanalAdapterWorker extends AbstractCanalAdapterWorker {

private static final int BATCH_SIZE = 50;

private static final int SO_TIMEOUT = 0;

private CanalConnector connector;

/**

* 单台client适配器worker的构造方法

*

* @param canalDestination canal实例名

* @param address canal-server地址

* @param canalOuterAdapters 外部适配器组

*/

public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, SocketAddress address,

List<List<OuterAdapter>> canalOuterAdapters){

super(canalOuterAdapters);

this.canalClientConfig = canalClientConfig;

this.canalDestination = canalDestination;

connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");

}

/**

* HA模式下client适配器worker的构造方法

*

* @param canalDestination canal实例名

* @param zookeeperHosts zookeeper地址

* @param canalOuterAdapters 外部适配器组

*/

public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, String zookeeperHosts,

List<List<OuterAdapter>> canalOuterAdapters){

super(canalOuterAdapters);

this.canalDestination = canalDestination;

this.canalClientConfig = canalClientConfig;

connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");

((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);

}

@Override

protected void process() {

while (!running) { // waiting until running == true

while (!running) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

}

}

int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();

if (retry == -1) {

// 重试次数-1代表异常时一直阻塞重试

retry = Integer.MAX_VALUE;

}

// long timeout = canalClientConfig.getTimeout() == null ? 300000 :

// canalClientConfig.getTimeout(); // 默认超时5分钟

Integer batchSize = canalClientConfig.getBatchSize();

if (batchSize == null) {

batchSize = BATCH_SIZE;

}

while (running) {

try {

syncSwitch.get(canalDestination);

logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);

connector.connect();

logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);

connector.subscribe();

logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination);

while (running) {

try {

syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES);

} catch (TimeoutException e) {

break;

}

if (!running) {

break;

}

for (int i = 0; i < retry; i++) {

if (!running) {

break;

}

Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

long batchId = message.getId();

try {

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

Thread.sleep(500);

} else {

if (logger.isDebugEnabled()) {

logger.debug("destination: {} batchId: {} batchSize: {} ",

canalDestination,

batchId,

size);

}

long begin = System.currentTimeMillis();

writeOut(message);

if (logger.isDebugEnabled()) {

logger.debug("destination: {} batchId: {} elapsed time: {} ms",

canalDestination,

batchId,

System.currentTimeMillis() - begin);

}

}

connector.ack(batchId); // 提交确认

break;

} catch (Exception e) {

if (i != retry - 1) {

connector.rollback(batchId); // 处理失败, 回滚数据

logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1));

} else {

connector.ack(batchId);

logger.error(e.getMessage() + " Error sync but ACK!");

}

Thread.sleep(500);

}

}

}

} catch (Throwable e) {

logger.error("process error!", e);

} finally {

connector.disconnect();

logger.info("=============> Disconnect destination: {} <=============", this.canalDestination);

}

if (running) { // is reconnect

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

// ignore

}

}

}

}

@Override

public void stop() {

try {

if (!running) {

return;

}

if (connector instanceof ClusterCanalConnector) {

((ClusterCanalConnector) connector).stopRunning();

} else if (connector instanceof SimpleCanalConnector) {

((SimpleCanalConnector) connector).stopRunning();

}

running = false;

syncSwitch.release(canalDestination);

logger.info("destination {} is waiting for adapters" worker thread die!", canalDestination);

if (thread != null) {

try {

thread.join();

} catch (InterruptedException e) {

// ignore

}

}

groupInnerExecutorService.shutdown();

logger.info("destination {} adapters worker thread dead!", canalDestination);

canalOuterAdapters.forEach(outerAdapters -> outerAdapters.forEach(OuterAdapter::destroy));

logger.info("destination {} all adapters destroyed!", canalDestination);

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

}

  • CanalAdapterWorker的process方法使用了两层while循环,第一层先执行syncSwitch.get(canalDestination),若出现异常则在finally里头执行connector.disconnect();第二层先执行syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES),若出现TimeoutException则继续循环,之后使用retry次数的循环来执行connector.getWithoutAck(batchSize),然后调用writeOut(message),最后执行connector.ack(batchId),然后跳出循环;若出现异常,在不超出retry次数时执行connector.rollback(batchId),超出retry次数则直接ack;其stop方法执行connector.stopRunning,然后执行syncSwitch.release(canalDestination)、groupInnerExecutorService.shutdown()、outerAdapter的destroy方法

AbstractCanalAdapterWorker

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

public abstract class AbstractCanalAdapterWorker {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());

protected String canalDestination; // canal实例

protected String groupId = null; // groupId

protected List<List<OuterAdapter>> canalOuterAdapters; // 外部适配器

protected CanalClientConfig canalClientConfig; // 配置

protected ExecutorService groupInnerExecutorService; // 组内工作线程池

protected volatile boolean running = false; // 是否运行中

protected Thread thread = null;

protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);

protected SyncSwitch syncSwitch;

public AbstractCanalAdapterWorker(List<List<OuterAdapter>> canalOuterAdapters){

this.canalOuterAdapters = canalOuterAdapters;

this.groupInnerExecutorService = Util.newFixedThreadPool(canalOuterAdapters.size(), 5000L);

syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class);

}

protected void writeOut(final Message message) {

List<Future<Boolean>> futures = new ArrayList<>();

// 组间适配器并行运行

canalOuterAdapters.forEach(outerAdapters -> {

final List<OuterAdapter> adapters = outerAdapters;

futures.add(groupInnerExecutorService.submit(() -> {

try {

// 组内适配器穿行运行,尽量不要配置组内适配器

adapters.forEach(adapter -> {

long begin = System.currentTimeMillis();

List<Dml> dmls = MessageUtil.parse4Dml(canalDestination, groupId, message);

if (dmls != null) {

batchSync(dmls, adapter);

if (logger.isDebugEnabled()) {

logger.debug("{} elapsed time: {}",

adapter.getClass().getName(),

(System.currentTimeMillis() - begin));

}

}

});

return true;

} catch (Exception e) {

logger.error(e.getMessage(), e);

return false;

}

}));

// 等待所有适配器写入完成

// 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成

RuntimeException exception = null;

for (Future<Boolean> future : futures) {

try {

if (!future.get()) {

exception = new RuntimeException("Outer adapter sync failed! ");

}

} catch (Exception e) {

exception = new RuntimeException(e);

}

}

if (exception != null) {

throw exception;

}

});

}

private void batchSync(List<Dml> dmls, OuterAdapter adapter) {

// 分批同步

if (dmls.size() <= canalClientConfig.getSyncBatchSize()) {

adapter.sync(dmls);

} else {

int len = 0;

List<Dml> dmlsBatch = new ArrayList<>();

for (Dml dml : dmls) {

dmlsBatch.add(dml);

if (dml.getData() == null || dml.getData().isEmpty()) {

len += 1;

} else {

len += dml.getData().size();

}

if (len >= canalClientConfig.getSyncBatchSize()) {

adapter.sync(dmlsBatch);

dmlsBatch.clear();

len = 0;

}

}

if (!dmlsBatch.isEmpty()) {

adapter.sync(dmlsBatch);

}

}

}

//......

}

  • AbstractCanalAdapterWorker的writeOut方法遍历canalOuterAdapters,挨个往groupInnerExecutorService提交batchSync任务;batchSync方法按canalClientConfig.getSyncBatchSize()配置分批执行adapter.sync(dmls)

SyncSwitch

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@Component

public class SyncSwitch {

private static final String SYN_SWITCH_ZK_NODE = "/sync-switch/";

private static final Map<String, BooleanMutex> LOCAL_LOCK = new ConcurrentHashMap<>();

private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK = new ConcurrentHashMap<>();

private Mode mode = Mode.LOCAL;

@Resource

private AdapterCanalConfig adapterCanalConfig;

@Resource

private CuratorClient curatorClient;

@PostConstruct

public void init() {

CuratorFramework curator = curatorClient.getCurator();

if (curator != null) {

mode = Mode.DISTRIBUTED;

DISTRIBUTED_LOCK.clear();

for (String destination : adapterCanalConfig.DESTINATIONS) {

// 对应每个destination注册锁

BooleanMutex mutex = new BooleanMutex(true);

initMutex(curator, destination, mutex);

DISTRIBUTED_LOCK.put(destination, mutex);

startListen(destination, mutex);

}

} else {

mode = Mode.LOCAL;

LOCAL_LOCK.clear();

for (String destination : adapterCanalConfig.DESTINATIONS) {

// 对应每个destination注册锁

LOCAL_LOCK.put(destination, new BooleanMutex(true));

}

}

}

public void get(String destination) throws InterruptedException {

if (mode == Mode.LOCAL) {

BooleanMutex mutex = LOCAL_LOCK.get(destination);

if (mutex != null) {

mutex.get();

}

} else {

BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);

if (mutex != null) {

mutex.get();

}

}

}

public void get(String destination, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {

if (mode == Mode.LOCAL) {

BooleanMutex mutex = LOCAL_LOCK.get(destination);

if (mutex != null) {

mutex.get(timeout, unit);

}

} else {

BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);

if (mutex != null) {

mutex.get(timeout, unit);

}

}

}

public synchronized void release(String destination) {

if (mode == Mode.LOCAL) {

BooleanMutex mutex = LOCAL_LOCK.get(destination);

if (mutex != null && !mutex.state()) {

mutex.set(true);

}

}

if (mode == Mode.DISTRIBUTED) {

BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);

if (mutex != null && !mutex.state()) {

mutex.set(true);

}

}

}

//......

}

  • SyncSwitch支持LOCAL、DISTRIBUTED模式,其get方法执行mutex.get()或者mutex.get(timeout, unit)方法;其release方法执行mutex.set(true)

小结

CanalAdapterWorker的process方法使用了两层while循环,第一层先执行syncSwitch.get(canalDestination),若出现异常则在finally里头执行connector.disconnect();第二层先执行syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES),若出现TimeoutException则继续循环,之后使用retry次数的循环来执行connector.getWithoutAck(batchSize),然后调用writeOut(message),最后执行connector.ack(batchId),然后跳出循环;若出现异常,在不超出retry次数时执行connector.rollback(batchId),超出retry次数则直接ack;其stop方法执行connector.stopRunning,然后执行syncSwitch.release(canalDestination)、groupInnerExecutorService.shutdown()、outerAdapter的destroy方法

doc

  • CanalAdapterWorker

以上是 聊聊canal的CanalAdapterWorker 的全部内容, 来源链接: utcz.com/z/515142.html

回到顶部