如何使用Logstash从MySQL动态更新数据(无重复项)?

我已配置logstash.conf以动态插入数据库的数据,但问题是:

当我更改表的一行时,该行不会在索引中更新,因为我仅在sql_last_value之后插入新值,尽管我是关于触发器的,但我不确定该怎么做。

input {

jdbc {

jdbc_connection_string =>"jdbc:mysql://localhost:3306/blog"

jdbc_user =>"root"

jdbc_password =>""

jdbc_driver_library =>"C:\Users\saidb\Downloads\mysql-connector-java-5.1.47\mysql-connector-java-5.1.47.jar"

jdbc_driver_class =>"com.mysql.jdbc.Driver"

schedule =>"* * * * *"

statement =>"SELECT * FROM blog_pro WHERE id >:sql_last_value"

use_column_value =>true

tracking_column =>id

}

}

output {

elasticsearch {

hosts =>"localhost:9200"

index =>"blog_pro"

document_type =>"data"

}

}

回答:

如果id用于选择行,则不能这样做。您有2种选择,

  1. 每次都选择所有行,然后使用query将它们发送到ES SELECT * FROM blog_pro,根据您的情况,我认为这不是一个好选择。

  2. 创建一个新列last_modified_time,其中包含记录(行)的最后修改的时间戳。然后使用它来过滤行。注意属性tracking_column_type => "timestamp"

`statement =>”SELECT * FROM blog_pro WHERE last_modiefied_time

:sql_last_value” use_column_value =>true tracking_column =>last_modified_time

tracking_column_type => “timestamp”`

这是完整的logstash配置

input {

jdbc {

jdbc_connection_string =>"jdbc:mysql://192.168.3.57:3306/blog_pro"

jdbc_user =>"dush"

jdbc_password =>"dush"

jdbc_driver_library =>"F:\logstash-6.2.2\bin\mysql-connector-java-5.1.6.jar"

jdbc_driver_class =>"com.mysql.jdbc.Driver"

schedule =>"* * * * *"

statement =>"SELECT * FROM blog_pro WHERE last_modified_time >:sql_last_value"

use_column_value =>true

tracking_column =>last_modified_time

tracking_column_type => "timestamp"

}

}

output

{

#output to elasticsearch

elasticsearch {

hosts => [ "192.168.1.245:9201" ]

action=>update

# "%{id}" - > primary key of the table

document_id => "%{id}"

doc_as_upsert =>true

}

}

请注意,您可能需要清除索引并使用此配置开始索引。我对此进行了测试,并且工作正常。

Elasticsearch版本= 5.xx

logstash版本= 6.2.2

以上是 如何使用Logstash从MySQL动态更新数据(无重复项)? 的全部内容, 来源链接: utcz.com/qa/399943.html

回到顶部