PostgreSQL 数据同步到ES 搭建操作

安装python 和dev 开发包

[root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm

准备中... ################################# [100%]

正在升级/安装...

1:python-devel-2.7.5-58.el7 ################################# [100%]

[root@rtm2 Packages]# ls

安装 multicorn

[root@rtm2 multicorn-1.3.5]# make

Python version is 2.7

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/errors.o src/errors.c

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/python.o src/python.c

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/query.o src/query.c

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/multicorn.o src/multicorn.c

gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -shared -o multicorn.so src/errors.o src/python.o src/query.o src/multicorn.o -L/opt/pgsql-10/lib -Wl,--as-needed -Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags -lpthread -ldl -lutil -lm -lpython2.7 -lpthread -ldl -lutil -lm -lpython2.7 -Xlinker -export-dynamic

.//preflight-check.sh

cp sql/multicorn.sql sql/multicorn--1.3.5.sql

[root@rtm2 multicorn-1.3.5]# make install

Python version is 2.7

...

安装pg-es-fdw-master

[root@rtm2 multicorn-1.3.5]# cd ../pg-es-fdw-master

[root@rtm2 pg-es-fdw-master]# ls

demo.sh dite LICENSE README.md setup.py

[root@rtm2 pg-es-fdw-master]# python setup.py build

running build

running build_py

creating build

creating build/lib

creating build/lib/dite

copying dite/__init__.py -> build/lib/dite

[root@rtm2 pg-es-fdw-master]# python setup.py install

running install

running bdist_egg

running egg_info

creating dite.egg-info

writing dite.egg-info/PKG-INFO

安装插件 multicorn

[postgres@rtm2 ~]$ psql

psql (10.3)

Type "help" for help.

postgres=# select * from pg_extension;

extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition

---------+----------+--------------+----------------+------------+-----------+--------------

plpgsql | 10 | 11 | f | 1.0 | |

(1 row)

postgres=# CREATE EXTENSION multicorn;

CREATE EXTENSION

postgres=# psql

postgres=# select * from pg_extension;

extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition

-----------+----------+--------------+----------------+------------+-----------+--------------

plpgsql | 10 | 11 | f | 1.0 | |

multicorn | 10 | 2200 | t | 1.3.5 | |

(2 rows)

postgres=# CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn OPTIONS(wrapper 'dite.ElasticsearchFDW');

CREATE SERVER

postgres=#

es

[root@rtm2 config]# vi elasticsearch.yml

node.name: "es-node1"

network.host: 192.168.31.121

discovery.zen.ping.unicast.hosts: ["192.168.31.121"]

[root@rtm2 config]# vi /etc/sysctl.conf

vm.max_map_count=262144

sysctl -p

[root@rtm2 config]# vi /etc/security/limits.conf

# End of file

root soft nofile 65536

root hard nofile 65536

root soft nproc 4096

root hard nproc 4096

~

启动es

[root@rtm2 bin]# ls

elasticsearch elasticsearch.in.bat elasticsearch-service-mgr.exe elasticsearch-service-x86.exe plugin.bat

elasticsearch.bat elasticsearch.in.sh elasticsearch-service-x64.exe plugin service.bat

[root@rtm2 bin]# ./bin/elasticsearch

test=# CREATE FOREIGN TABLE pp_es (id bigint,age bigint) SERVER multicorn_es OPTIONS (host

test(# '192.168.31.121', port '9200', node 'es-node1', index 'pp');

CREATE FOREIGN TABLE

test=#

创建触发器和外部表

test=# CREATE OR REPLACE FUNCTION index_pp() RETURNS trigger AS $def$

test$# BEGIN

test$# INSERT INTO pp_es (id, age) VALUES

test$# (NEW.id, NEW.age);

test$# RETURN NEW;

test$# END;

test$# $def$ LANGUAGE plpgsql;

CREATE FUNCTION

test=# CREATE TRIGGER es_insert_pp AFTER INSERT ON pp FOR EACH ROW EXECUTE PROCEDURE index_pp();

CREATE TRIGGER

test=#

新增数据测试

test=# insert into pp (id,age) values (1,11);

INSERT 0 1

test=# select * from pp;

id | age

----+-----

1 | 11

(1 row)

test=#

检查es数据

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'

{

"took" : 104,

"timed_out" : false,

"_shards" : {

"total" : 5,

"successful" : 5,

"failed" : 0

},

"hits" : {

"total" : 2,

"max_score" : 1.0,

"hits" : [ {

"_index" : "es-node1",

"_type" : "pp",

"_id" : "1",

"_score" : 1.0,

"_source":{"age": "11"}

}, {

"_index" : "es-node1",

"_type" : "pp",

"_id" : "2",

"_score" : 1.0,

"_source":{"age": "22"}

} ]

}

}

[root@rtm2 ~]#

创建更新触发器

test=# CREATE OR REPLACE FUNCTION updadeIndex_pp() RETURNS trigger AS $def$

BEGIN

UPDATE pp_es SET

id = NEW.id,

age = NEW.age

where id =NEW.id;

RETURN NEW;

END;

$def$ LANGUAGE plpgsql;

CREATE FUNCTION

test=# ^C

test=#

test=# CREATE TRIGGER es_update_pp AFTER UPDATE OF id, age ON pp FOR EACH ROW WHEN (OLD.* IS DISTINCT

test(# FROM NEW.*)EXECUTE PROCEDURE updadeIndex_pp();

CREATE TRIGGER

test=#

更新表数据

test=# select * from pp;

id | age

----+-----

1 | 11

2 | 22

3 | 22

(3 rows)

test=# update pp a set a.age = 33 where a.id = 3;

ERROR: column "a" of relation "pp" does not exist

LINE 1: update pp a set a.age = 33 where a.id = 3;

^

test=# update pp set age = 33 where id = 3;

UPDATE 1

test=# select * from pp;

id | age

----+-----

1 | 11

2 | 22

3 | 33

(3 rows)

test=#

es查询变更

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'

{

"took" : 4,

"timed_out" : false,

"_shards" : {

"total" : 5,

"successful" : 5,

"failed" : 0

},

"hits" : {

"total" : 3,

"max_score" : 1.0,

"hits" : [ {

"_index" : "es-node1",

"_type" : "pp",

"_id" : "1",

"_score" : 1.0,

"_source":{"age": "11"}

}, {

"_index" : "es-node1",

"_type" : "pp",

"_id" : "2",

"_score" : 1.0,

"_source":{"age": "22"}

}, {

"_index" : "es-node1",

"_type" : "pp",

"_id" : "3",

"_score" : 1.0,

"_source":{"age": "33"}

} ]

}

}

[root@rtm2 ~]#

补充:logstash同步pgsql数据到Elasticsearch

一、对于logstash的配置我就不在多说,主要是三部分,input、filter、output的配置

二、配置步骤

1、input配置

input {

stdin {

}

jdbc {

jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/world"

jdbc_user => "postgres"

jdbc_password => "zhang123"

jdbc_driver_library => "D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar"

jdbc_driver_class => "org.postgresql.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "300000"

use_column_value => "true"

tracking_column => "id"

statement_filepath => "D:\logstash-6.4.0\bin\pgsql\jdbc.sql"

schedule => "* * * * *"

type => "jdbc"

jdbc_default_timezone =>"Asia/Shanghai"

}

}

2、filter配置

filter {

json {

source => "message"

remove_field => ["message"]

}

}

3、output 配置,就是elasticsearch的基本配置

output {

elasticsearch {

hosts => ["localhost:9200"]

index => "test_out"

template => "D:\logstash-6.4.0\bin\pgsql\es-template.json"

template_name => "t-statistic-out-logstash"

template_overwrite => true

document_type => "out"

document_id => "%{id}"

}

stdout {

codec => json_lines

}

}

4、es-template.json的配置

{

"template" : "t-statistis-out-template",

"order":1,

"settings": {

"index": {

"refresh_interval": "5s"

}

},

"mappings": {

"_default_": {

"_all" : {"enabled":false},

"dynamic_templates": [

{

"message_field" : {

"match" : "message",

"match_mapping_type" : "string",

"mapping" : { "type" : "string", "index" : "not_analyzed" }

}

}, {

"string_fields" : {

"match" : "*",

"match_mapping_type" : "string",

"mapping" : { "type" : "string", "index" : "not_analyzed" }

}

}

],

"properties": {

"@timestamp": {

"type": "date"

},

"@version": {

"type": "keyword"

},

"id": {

"type": "keyword"

},

"name": {

"type": "keyword"

},

"pp": {

"type": "keyword"

}

}

}

},

"aliases": {}

}

最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好自己的数据库表格的数据

启动命令:进入到logstash的bin目录下,自己的logstash配置都是放在bin的pgsql这个目录下面(这个自己随意创建位置都可以)

logstash.bat -f ./pgsql/jdbc.conf

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。

以上是 PostgreSQL 数据同步到ES 搭建操作 的全部内容, 来源链接: utcz.com/z/343943.html

回到顶部