canal

database

1.  canal 简介

canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

canal 工作原理:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

官网地址:https://github.com/alibaba/canal

2.  canal 服务端

 (1)修改 /etc/my.cnf 文件,增加以下配置(PS:改完后别忘了重启数据库 systemctl restart mysqld )

[mysqld]

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

备忘录

show variables like "log_bin";

show binary logs;

show master status;

select * from mysql.`user`;

(2)授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY "Canal@123456";  

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "canal"@"%";

-- GRANT ALL PRIVILEGES ON *.* TO "canal"@"%" ;

FLUSH PRIVILEGES;

 (3)下载canal,并解压缩

https://github.com/alibaba/canal/releases

(4)修改配置

vim conf/example/instance.properties

启动

sh bin/startup.sh

查看日志

# 查看 server 日志

vi logs/canal/canal.log

# 查看 instance 的日志

vi logs/example/example.log

# 关闭

sh bin/stop.sh

3. canal 客户端

依赖

<dependency>

<groupId>com.alibaba.otter</groupId>

<artifactId>canal.client</artifactId>

<version>1.1.5</version>

</dependency>

<dependency>

<groupId>com.alibaba.otter</groupId>

<artifactId>canal.protocol</artifactId>

<version>1.1.5</version>

</dependency>

配置

canal.server.host=192.168.28.32

canal.server.port=11111

canal.server.destination=example

canal.server.username=canal

canal.server.password=canal

spring.kafka.bootstrap-servers=192.168.28.32:9092

logback.xml

<?xml version="1.0" encoding="UTF-8"?>

<configuration scan="true" scanPeriod="30 seconds" debug="false">

<property name="log.charset" value="utf-8" />

<property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n" />

<property name="log.dir" value="./logs" />

<!--输出到控制台-->

<appender name="console" class="ch.qos.logback.core.ConsoleAppender">

<encoder>

<pattern>${log.pattern}</pattern>

<charset>${log.charset}</charset>

</encoder>

</appender>

<appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">

<file>${log.dir}/my-canal-client.log</file>

<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">

<fileNamePattern>${log.dir}/my-canal-client.%d{yyyy-MM-dd}.log</fileNamePattern>

<maxHistory>30</maxHistory>

<totalSizeCap>3GB</totalSizeCap>

</rollingPolicy>

<encoder>

<pattern>${log.pattern}</pattern>

</encoder>

</appender>

<root level="info">

<appender-ref ref="console" />

<appender-ref ref="file" />

</root>

</configuration>

测试代码

package com.my.component.canal;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import lombok.extern.slf4j.Slf4j;

import org.springframework.util.Assert;

import java.util.List;

/**

* @Author ChengJianSheng

* @Date 2022/1/13

*/

@Slf4j

public class Demo {

private volatile boolean running = false;

private CanalConnector connector;

private Thread thread = null;

public Demo(CanalConnector connector) {

this.connector = connector;

}

public void start() {

Assert.notNull(connector, "connector is null");

thread = new Thread(this::process);

running = true;

thread.start();

}

public void stop() {

if (!running) {

return;

}

running = false;

if (thread != null) {

try {

thread.join();

} catch (InterruptedException e) {

// ignore

}

}

}

private void process() {

int batchSize = 5 * 1024;

while (running) {

try {

connector.connect();

connector.subscribe(".*\..*");

while (running) {

Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

// try {

// Thread.sleep(1000);

// } catch (InterruptedException e) {

// }

} else {

printEntry(message.getEntries());

}

if (batchId != -1) {

connector.ack(batchId); // 提交确认

}

}

} catch (Throwable e) {

log.error("process error!", e);

try {

Thread.sleep(1000L);

} catch (InterruptedException e1) {

// ignore

}

connector.rollback(); // 处理失败, 回滚数据

} finally {

connector.disconnect();

}

}

}

private void printEntry(List<CanalEntry.Entry> entrys) {

for (CanalEntry.Entry entry : entrys) {

if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {

continue;

}

CanalEntry.RowChange rowChage = null;

try {

rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);

}

CanalEntry.EventType eventType = rowChage.getEventType();

System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));

for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {

if (eventType == CanalEntry.EventType.DELETE) {

printColumn(rowData.getBeforeColumnsList());

} else if (eventType == CanalEntry.EventType.INSERT) {

printColumn(rowData.getAfterColumnsList());

} else {

System.out.println("-------> before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("-------> after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private void printColumn(List<CanalEntry.Column> columns) {

for (CanalEntry.Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());

}

}

}

启动

package com.my.component.canal;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.common.utils.AddressUtils;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.CommandLineRunner;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import java.net.SocketAddress;

/**

* @Author ChengJianSheng

* @Date 2022/1/4

*/

@Slf4j

@Component

public class MyCommandLineRunner implements CommandLineRunner {

@Autowired

private CanalServerProperties canalServerProperties;

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

@Override

public void run(String... args) throws Exception {

String destination = "example";

// String ip = AddressUtils.getHostIp();

// String ip = canalServerProperties.getHost();

String ip = "192.168.28.32";

SocketAddress address = new InetSocketAddress(ip, 11111);

CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "canal", "canal");

Demo demo = new Demo(connector);

demo.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {

try {

log.info("## stop the canal client");

demo.stop();

} catch (Throwable e) {

log.warn("##something goes wrong when stopping canal:", e);

} finally {

log.info("## canal client is down.");

}

}));

}

}

 

以上是 canal 的全部内容, 来源链接: utcz.com/z/536242.html

回到顶部