Logstash,来自多个文档中xml文件的split事件,保留来自根标签的信息

我的问题:我的XML文件包含要使用Logstash解析的事件,然后再使用Kibana对其进行请求。我想在每个事件中保留来自ROOT标记的所有信息。

输入看起来像:

<?xml version="1.0" encoding="UTF-8"?>

<ROOT number="34">

<EVENTLIST>

<EVENT name="hey"/>

<EVENT name="you"/>

</EVENTLIST>

</ROOT>

我想要的是两个这样的文件:

{

"number":"34"

"name": "Hey"

}

{

"number":"34"

"name": "you"

}

Logstash conf:

input {

stdin { }

}

filter {

xml {

store_xml => "false"

source => "message"

target => "EVENT"

xpath => [

"/ROOT/@number","number",

"/ROOT/EVENTLIST/EVENT/@name","name"

]

}

}

output { elasticsearch { host => localhost } stdout { codec => rubydebug } }

没有工作。我得到的是:

{

"number" : ["34"]

"name":["hey,"you""]

}

我遵循了这篇文章的解决方案:https :

//serverfault.com/questions/615196/logstash-parsing-xml-document- contains-

multiple-log-entries

但是我的问题仍然存在,我丢失了根标签的信息。

解决方案之一可能是使用一些红宝石过滤器来处理该问题,但我不知道红宝石。另一种方法是使用一些Java程序将XML转换为JSON,然后再将其发送到elasticsearch

有什么想法可以解决这个问题,还是我必须学习红宝石?

回答:

如果您的结构很简单,则可以使用memorize我编写的插件。

您的配置如下所示:

filter {

if ([message] =~ /<ROOT/) {

grok {

match => [ "message",

'number="(?<number>\d+)" number2="(?<number1>\d+)"'

]

}

} else if ([message] =~ /<EVENT /) {

grok {

match => [ "message", 'name="(?<name>[^"]+)"']

}

}

memorize {

fields => ["number","number1"]

}

if ([message] !~ /<EVENT /) {

drop {}

} else {

mutate { remove_field => ["message"] }

}

}

我的示例显示了ROOT根据下面的注释在元素中查找多个内容。这是支持记忆多个字段的插件版本:

# encoding: utf-8

require "logstash/filters/base"

require "logstash/namespace"

require "set"

#

# This filter will look for fields from an event and record the last value

# of them. If any are not present, their last value will be added to the

# event

#

# The config looks like this:

#

# filter {

# memorize {

# fields => ["time"]

# default => { "time" => "00:00:00.000" }

# }

# }

#

# The `fields` is an array of the field NAMES that you want to memorize

# The `default` is a map of field names to field values that you want

# to use if the field isn't present and has no memorized value (optional)

class LogStash::Filters::Memorize < LogStash::Filters::Base

config_name "memorize"

milestone 2

# An array of the field names to to memorize

config :fields, :validate => :array, :required => true

# a map for default values to use if its not seen before we need it

config :default, :validate => :hash, :required => false

# The stream identity is how the filter determines which stream an

# event belongs to. See the multiline plugin if you want more details on how

# this might work

config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}"

public

def initialize(config = {})

super

@threadsafe = false

# This filter needs to keep state.

@memorized = Hash.new

end # def initialize

public

def register

# nothing needed

end # def register

public

def filter(event)

return unless filter?(event)

any = false

@fields.each do |field|

if event[field].nil?

map = @memorized[@stream_identity]

val = map.nil? ? nil : map[field]

if val.nil?

val = @default.nil? ? nil : @default[field]

end

if !val.nil?

event[field] = val

any = true

end

else

map = @memorized[@stream_identity]

if map.nil?

map = @memorized[@stream_identity] = Hash.new

end

val = event[field]

map[field] = event[field]

end #if

if any

filter_matched(event)

end

end #field.each

end

end

对于logstash 1.5和更高版本,可以通过以下方式安装此插件

bin/plugin install logstash-filter-memorize

以上是 Logstash,来自多个文档中xml文件的split事件,保留来自根标签的信息 的全部内容, 来源链接: utcz.com/qa/408811.html

回到顶部