Logstash:将日志文件中的复杂多行JSON解析为ElasticSearch

首先我要说的是,我在这里已经通过了尽可能多的示例,但仍然无法奏效。我不确定是否是因为日志文件中JSON的复杂性。

我正在寻找示例日志条目,让Logstash读取它,并将JSON作为JSON发送到ElasticSearch。

(简化的)示例如下所示:

[0m[0m16:02:08,685 INFO  [org.jboss.as.server] (ServerService Thread Pool -- 28) JBAS018559: {

"appName": "SomeApp",

"freeMemReqStartBytes": 544577648,

"freeMemReqEndBytes": 513355408,

"totalMem": 839385088,

"maxMem": 1864368128,

"anonymousUser": false,

"sessionId": "zz90g0dFQkACVao4ZZL34uAb",

"swAction": {

"clock": 0,

"clockStart": 1437766438950,

"name": "General",

"trackingMemory": false,

"trackingMemoryGcFirst": true,

"memLast": 0,

"memOrig": 0

},

"remoteHost": "127.0.0.1",

"remoteAddr": "127.0.0.1",

"requestMethod": "GET",

"mapLocalObjectCount": {

"FinanceEmployee": {

"x": 1,

"singleton": false

},

"QuoteProcessPolicyRef": {

"x": 10,

"singleton": false

},

"LocationRef": {

"x": 2,

"singleton": false

}

},

"theSqlStats": {

"lstStat": [

{

"sql": "select * FROM DUAL",

"truncated": false,

"truncatedSize": -1,

"recordCount": 1,

"foundInCache": false,

"putInCache": false,

"isUpdate": false,

"sqlFrom": "DUAL",

"usingPreparedStatement": true,

"isLoad": false,

"sw": {

"clock": 104,

"clockStart": 1437766438970,

"name": "General",

"trackingMemory": false,

"trackingMemoryGcFirst": true,

"memLast": 0,

"memOrig": 0

},

"count": 0

},

{

"sql": "select * FROM DUAL2",

"truncated": false,

"truncatedSize": -1,

"recordCount": 0,

"foundInCache": false,

"putInCache": false,

"isUpdate": false,

"sqlFrom": "DUAL2",

"usingPreparedStatement": true,

"isLoad": false,

"sw": {

"clock": 93,

"clockStart": 1437766439111,

"name": "General",

"trackingMemory": false,

"trackingMemoryGcFirst": true,

"memLast": 0,

"memOrig": 0

},

"count": 0

}

]

}

}

我尝试过的Logstash配置无效。到目前为止最接近的是:

input {

file {

codec => multiline {

pattern => '\{(.*)\}'

negate => true

what => previous

}

path => [ '/var/log/logstash.log' ]

start_position => "beginning"

sincedb_path => "/dev/null"

}

}

filter {

json {

source => message

}

}

output {

stdout { codec => rubydebug }

elasticsearch {

cluster => "logstash"

index => "logstashjson"

}

}

我也尝试过:

input {

file {

type => "json"

path => "/var/log/logstash.log"

codec => json #also tried json_lines

}

}

filter {

json {

source => "message"

}

}

output {

stdout { codec => rubydebug }

elasticsearch {

cluster => "logstash"

codec => "json" #also tried json_lines

index => "logstashjson"

}

}

我只想获取上面发布的JSON并将其“按原样”发送给ElasticSearch,就像我对该文件进行了cURL PUT一样。感谢您的帮助,谢谢!

在Leonid的帮助下,这是我现在的配置:

input {

file {

codec => multiline {

pattern => "^\["

negate => true

what => previous

}

path => [ '/var/log/logstash.log' ]

start_position => "beginning"

sincedb_path => "/dev/null"

}

}

filter {

grok {

match => { "message" => "^(?<rubbish>.*?)(?<logged_json>{.*)" }

}

json {

source => "logged_json"

target => "parsed_json"

}

}

output {

stdout {

codec => rubydebug

}

elasticsearch {

cluster => "logstash"

index => "logstashjson"

}

}

回答:

抱歉,我无法发表评论,因此将发布答案。 您document_typeelaticsearch配置中缺少a ,否则将如何推导?


好吧,在查看了logstash参考并与@Ascalonian紧密合作之后,我们想到了以下配置:

input { 

file {

# in the input you need to properly configure the multiline codec.

# You need to match the line that has the timestamp at the start,

# and then say 'everything that is NOT this line should go to the previous line'.

# the pattern may be improved to handle case when json array starts at the first

# char of the line, but it is sufficient currently

codec => multiline {

pattern => "^\["

negate => true

what => previous

max_lines => 2000

}

path => [ '/var/log/logstash.log']

start_position => "beginning"

sincedb_path => "/dev/null"

}

}

filter {

# extract the json part of the message string into a separate field

grok {

match => { "message" => "^.*?(?<logged_json>{.*)" }

}

# replace newlines in the json string since the json filter below

# can not deal with those. Also it is time to delete unwanted fields

mutate {

gsub => [ 'logged_json', '\n', '' ]

remove_field => [ "message", "@timestamp", "host", "path", "@version", "tags"]

}

# parse the json and remove the string field upon success

json {

source => "logged_json"

remove_field => [ "logged_json" ]

}

}

output {

stdout {

codec => rubydebug

}

elasticsearch {

cluster => "logstash"

index => "logstashjson"

}

}

以上是 Logstash:将日志文件中的复杂多行JSON解析为ElasticSearch 的全部内容, 来源链接: utcz.com/qa/427189.html

回到顶部