logstash Apache日志的自定义日志过滤器

我是ELK堆栈的新手。我有一个文件拍服务,将日志发送到logstash,并在logstash中使用grok过滤器,将数据推送到elasticsearch索引。

我正在使用gork筛选器match => { "message" => "%{COMBINEDAPACHELOG}"}来解析数据。

我的问题是,我希望将字段名称及其值存储在elasticsearch索引中。我的日志的不同版本如下:


27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atm&explain=true&bridge=true HTTP/1.1" 200 3284

27.60.18.21 - - [27/Aug/2017:10:28:49 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=atms&explain=true&bridge=true HTTP/1.1" 200 1452

27.60.18.21 - - [27/Aug/2017:10:28:52 +0530] "GET /api/v1.2/places/nearby/json?&refLocation=28.5359586,77.3677936&keyword=FINATM HTTP/1.1" 200 3283

27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=co&explain=true&bridge=true HTTP/1.1" 200 3415

27.60.18.21 - - [27/Aug/2017:10:29:06 +0530] "GET /api/v1.2/places/search/json?username=pradeep.pgu&location=28.5359586,77.3677936&query=cof&explain=true&bridge HTTP/1.1" 200 2476


我想要的弹性索引字段如下:

  1. client_ip => type必须与kibana用于IP映射的内容兼容。
  2. 时间戳=>日期时间格式。=>日志的时间
  3. 方法=>文本=>被调用的方法,例如GET,POST
  4. 版本=>十进制数=>例如1.2 / 1.0(在示例日志中为v1.2)
  5. 用户名=>文本=>它是username=(在示例日志中为pradeep.pgu)之后的文本
  6. location => geo_point type =>该值同时具有纬度和经度,以便kibana可以在地图上绘制它们。
  7. search_query => text =>被搜索的内容(在示例中,来自两个字段“ keyword =“或” query =“)。两个字段中的任何一个都将存在,而一个字段将存在,必须使用其值。
  8. response_code => number =>响应代码。(示例中为200)
  9. data_transfered => number =>传输的数据量(样本中的最后一个数字)。

这样的事情有可能吗?gork过滤器是否对此有规定?问题是参数不是特定于订单的。

回答:

从开始HTTPD_COMMONLOG,您可以使用以下模式(可以在grok

tester上进行测试):

grok {

match => {

"message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/places/search/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"

}

}

一旦grok过滤器提取了请求,就可以在其上使用kv过滤器,该过滤器将提取参数(并忽略参数不是特定于订单的问题)。您必须将field_split选项设置为&:

kv { 

source => "request"

field_split => "&"

}

对于search_query,根据存在的字段,我们使用mutate带有add_field选项的过滤器来创建字段。


filter {

grok {

match => {

"message" => "%{IPORHOST:client_ip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:method} /api/v%{NUMBER:version}/.*/json\?%{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response_code} (?:%{NUMBER:data_transfered}|-)"

}

}

kv {

source => "request"

field_split => "&"

}

if [query] {

mutate {

add_field => { "search_query" => "%{query}" }

}

} else if [keyword] {

mutate {

add_field => { "search_query" => "%{keyword}" }

}

}

if [refLocation] {

mutate {

rename => { "refLocation" => "location" }

}

}

}

以上是 logstash Apache日志的自定义日志过滤器 的全部内容, 来源链接: utcz.com/qa/407237.html

回到顶部