Logstash:将两个日志合并到一个输出文档中
我已将syslog设置为使用以下过滤器将日志发送到logstash:
output { elasticsearch
{ hosts => ["localhost:9200"]
document_id => "%{job_id}"
}
}
filter {
grok {
overwrite => ["message"]
}
json {
source => "message"
}
}
我的应用程序之一的典型消息将具有初始状态和job_id:
{"job_id": "xyz782", state: "processing", job_type: "something"}
几分钟后,另一个日志将具有相同的log_id,不同的状态和处理时间:
{"job_id": "xyz782", state:"failed", processing_time: 12.345}
这些字段已正确加载,但是创建了两个文档。我希望只为初始日志创建一个文档,然后用第二个日志来更新第一个日志,这意味着更新后的文档将具有以下字段:
{"job_id": "xyz782", state: "failed", job_type: "something", processing_time: 12.345}
如您在logstash
conf输出中所看到的,我使用job_id作为文档ID,但是,第二条消息似乎替换了第一条消息中的字段,但也擦除了第一条消息中所有不在字段中的字段。例如,第二个消息中,第一个消息中出现的job_type字段未出现在最终文档中。这可能与json两次都来自同一个字段“消息”有关。还有另一种方法可以将两个日志消息合并到logstash中的一个文档中?
回答:
您可以使用aggregate
过滤器来执行此操作。聚合筛选器支持基于公共字段值将多个日志行聚合到一个事件中。在您的情况下,公共字段将是该job_id
字段。
然后,我们需要另一个字段来检测应该聚合的第一个事件与第二个事件。就您而言,这就是state
字段。
因此,您只需要向现有的Logstash配置中添加另一个过滤器,如下所示:
filter { ...your other filters
if [state] == "processing" {
aggregate {
task_id => "%{job_id}"
}
} else if [state] == "failed" {
aggregate {
task_id => "%{job_id}"
end_of_task => true
timeout => 120
}
}
}
您可以timeout
根据作业的运行时间自由调整(以秒为单位)。
以上是 Logstash:将两个日志合并到一个输出文档中 的全部内容, 来源链接: utcz.com/qa/433449.html