全量同步多张db表到ES同一个索引[数据库教程]

database

一、演示场景:

演示的场景主要是解决MySQL多张业务大表进行多表join查询效率低下的问题。
通过把MySQL的多张大表的数据同步到同一个ES索引中。(也就是有多表字段合并到es一张宽表来解决MySQL多表join效率低下的问题)

1.1、演示环境

自建MySQL服务5.7.22

ES单实例版本6.2.4

服务器python环境2.7.5

部署同步服务mysqlmom程序

具体安装部署此处忽略。有需要可以查看本博客找

二、MySQLmom具体配置文件

[[email protected] ~]# cat /data1/soft/mysqlsmom01/test_mom/init_config.py

# coding=utf-8

STREAM = "INIT"

# 修改数据库连接

CONNECTION = {

‘host‘: ‘172.16.0.197‘,

‘port‘: 3306,

‘user‘: ‘click_rep‘,

‘passwd‘: ‘jwtest123456‘

}

# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1

BULK_SIZE = 50000

# 修改elasticsearch节点

#NODES = [{"host": "127.0.0.1", "port": 9200}]

NODES = [{"host": "172.16.0.247", "port": 9999}]

TASKS = [

# 同步stdb01.test03到es:

{

"stream": {

"database": "test_db", # 在此数据库执行sql语句

"sql": "select * from test01", # 将该sql语句选中的数据同步到 elasticsearch

# "pk": {"field": "id", "type": "char"} # 当主键id的类型是字符串时

},

"jobs": [

{

"actions": ["insert", "update"],

"pipeline": [

{"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段

{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id

],

"dest": {

"es": {

"action": "upsert",

"index": "test01_company_index", # 设置 index

"type": "test01", # 设置 type

"nodes": NODES

}

}

}

]

},

{

"stream": {

"database": "test_db", # 在此数据库执行sql语句

"sql": "select * from company_staff", # 将该sql语句选中的数据同步到 elasticsearch

# "pk": {"field": "id", "type": "char"} # 当主键id的类型是字符串时

},

"jobs": [

{

"actions": ["insert", "update"],

"pipeline": [

{"only_fields": {"fields": ["id", "company_name", "company_staff", "channel", "url"]}}, # 只同步 id 和 username字段

{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id

],

"dest": {

"es": {

"action": "upsert",

"index": "test01_company_index", # 设置 index

"type": "test01", # 设置 type

"nodes": NODES

}

}

}

]

}

]

# CUSTOM_ROW_HANDLERS = "./my_handlers.py"

# CUSTOM_ROW_FILTERS = "./my_filters.py"

?

三、mysql测试表建表语句,表数据以及同步程序启动

mysql测试表建表语句和表数据如下:

[email protected] 16:16: [test_db]> show create table company_staffG

*************************** 1. row ***************************

Table: company_staff

Create Table: CREATE TABLE `company_staff` (

`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT ‘id‘,

`company_name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT ‘‘ COMMENT ‘公司名‘,

`company_staff` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT ‘‘ COMMENT ‘人员规模‘,

`channel` varchar(10) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT ‘‘ COMMENT ‘来源‘,

`url` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT ‘‘ COMMENT ‘url‘,

`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间‘,

`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ‘更新时间‘,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT=‘namelist人员规模表‘

1 row in set (0.00 sec)

[email protected] 16:17: [test_db]> select * from company_staff;

+----+-----------------------------------------------------------------------------+---------------+-------------+-----------------------------------------+---------------------+---------------------+

| id | company_name | company_staff | channel | url | create_time | update_time |

+----+-----------------------------------------------------------------------------+---------------+-------------+-----------------------------------------+---------------------+---------------------+

| 1 | 永兴东润(中国)服饰有限公司北京海淀第四儿童服饰店 | liepin | 100-499人 | https://www.liepin.com/company/8321725/ | 2021-06-19 17:21:57 | 2021-06-19 17:21:57 |

| 2 | 东(中国)服饰有限公司北京海淀第四儿童服饰店 | liepin | 100-499人 | https://www.liepin.com/company/8321725/ | 2021-06-19 17:21:57 | 2021-06-19 17:21:57 |

| 3 | 永兴东润(中国)服饰有限公司北京海淀第四儿童服饰店 | liepin | 100-499人 | https://www.liepin.com/company/8321725/ | 2021-06-19 17:21:57 | 2021-06-19 17:21:57 |

| 4 | 润(中国) | liepin | 100-499人 | https://www.liepin.com/company/8321725/ | 2021-06-19 17:21:57 | 2021-06-19 17:21:57 |

+----+-----------------------------------------------------------------------------+---------------+-------------+-----------------------------------------+---------------------+---------------------+

4 rows in set (0.00 sec)

[email protected] 16:17: [test_db]> show create table test01G

*************************** 1. row ***************************

Table: test01

Create Table: CREATE TABLE `test01` (

`id` int(8) NOT NULL AUTO_INCREMENT,

`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,

`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,

`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci

1 row in set (0.00 sec)

[email protected] 16:17: [test_db]> select * from test01;

+----+----------+------------+---------------------+

| id | username | password | create_time |

+----+----------+------------+---------------------+

| 1 | tomcat | xiaohuahua | 2021-07-03 23:51:17 |

| 2 | php | xiao | 2021-07-03 23:53:36 |

| 3 | fix | xiao | 2021-07-03 23:53:49 |

| 4 | java | bai | 2021-07-03 23:54:01 |

+----+----------+------------+---------------------+

4 rows in set (0.00 sec)

[[email protected] mysqlsmom01]# mom run -c ./test_mom/binlog_config.py >mysqlmom.log 2>&1 &

2021-08-08 16:23:57,873 root INFO {"username": "tomcat", "_id": 1, "id": 1}

2021-08-08 16:23:57,874 root INFO {"username": "php", "_id": 2, "id": 2}

2021-08-08 16:23:57,874 root INFO {"username": "fix", "_id": 3, "id": 3}

2021-08-08 16:23:57,874 root INFO {"username": "java", "_id": 4, "id": 4}

2021-08-08 16:23:57,975 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.101s]

2021-08-08 16:23:57,979 root INFO {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6c38u5174u4e1cu6da6uff08u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 1, "id": 1, "channel": "100-499u4eba "}

2021-08-08 16:23:57,979 root INFO {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u4e1c(u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 2, "id": 2, "channel": "100-499u4eba "}

2021-08-08 16:23:57,979 root INFO {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6c38u5174u4e1cu6da6(u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 3, "id": 3, "channel": "100-499u4eba "}

2021-08-08 16:23:57,979 root INFO {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6da6(u4e2du56fduff09", "_id": 4, "id": 4, "channel": "100-499u4eba "}

2021-08-08 16:23:58,007 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.027s]

real 0m0.637s

user 0m0.447s

sys 0m0.061s


全量同步启动命令如下:

?

[[email protected] mysqlsmom01]# mom run -c ./test_mom/init_config.py >mysqlmom.log 2>&1 &

2021-08-08 16:23:57,873 root INFO {"username": "tomcat", "_id": 1, "id": 1}

2021-08-08 16:23:57,874 root INFO {"username": "php", "_id": 2, "id": 2}

2021-08-08 16:23:57,874 root INFO {"username": "fix", "_id": 3, "id": 3}

2021-08-08 16:23:57,874 root INFO {"username": "java", "_id": 4, "id": 4}

2021-08-08 16:23:57,975 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.101s]

2021-08-08 16:23:57,979 root INFO {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6c38u5174u4e1cu6da6uff08u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 1, "id": 1, "channel": "100-499u4eba "}

2021-08-08 16:23:57,979 root INFO {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u4e1c(u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 2, "id": 2, "channel": "100-499u4eba "}

2021-08-08 16:23:57,979 root INFO {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6c38u5174u4e1cu6da6(u4e2du56fduff09u670du9970u6709u9650u516cu53f8u5317u4eacu6d77u6dc0u7b2cu56dbu513fu7ae5u670du9970u5e97", "_id": 3, "id": 3, "channel": "100-499u4eba "}

2021-08-08 16:23:57,979 root INFO {"url": "https://www.liepin.com/company/8321725/", "company_staff": "liepin", "company_name": "u6da6(u4e2du56fduff09", "_id": 4, "id": 4, "channel": "100-499u4eba "}

2021-08-08 16:23:58,007 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.027s]

real 0m0.637s

user 0m0.447s

sys 0m0.061s

?

图示如下:

到此时,全量同步多张db表数据到ES同一个索引演示完成

全量同步多张db表到ES同一个索引

原文:https://blog.51cto.com/wujianwei/3314093

以上是 全量同步多张db表到ES同一个索引[数据库教程] 的全部内容, 来源链接: utcz.com/z/535821.html

回到顶部