logstash消费kafka数据,根据ID写入hdfs的对应路径下

编程

开发新需求,需要消费kafka的数据,根据业务主键ID写入到对应的hdfs路径下

最开始实验的logstash为7.4版本,发现和本地的kafka版本不对应,报错信息:broker may not be available,后来查看logstash版本,发现需要使用logstash2.4版本才可以和本地的kafka0.9版本对应

日志的格式为:

2020-06-11 11:02:02.632 -- {"gradeId":"2","role":"STUDENT","userStatus":1,"data":{"publishExamClassId":"47c50966f34b41cf819358d077691cce","examSubmitType":2,"examQuestionCount":41,"questionResultSet":[{"questionId":"xl_a543bbe2ca1e4b8e8029c1148ed34abf_v4","originalQueType":11,"questionResult":"1","questionType":1,"userAnswerContent":"["1"]"},{"questionId":"xl_4367b003c0a14d2390470b89ece9184e_v4","originalQueType":11,"questionResult":"1","questionType":1,"userAnswerContent":"["3"]"}],"examSubmitTime":1591843564000,"examId":203877,"unifyType":1,"examRightQuestionCount":32,"subjectId":"010","questionAnswerTime":0},"actId":"3000005","classId":"47c50966f34b41cf819358d077691cce","countyId":"421024","schoolId":"154986","localIp":"10.0.183.13","userIp":"","originCode":"1","functionModule":"exam","time":1591844522631,"userIcon":"00000000000000000000000000000000"}

包含两部分:前面为时间字符串 ,后面为json串,通过 -- 进行的链接,需要根据后半部分json串中的examId写入到hdfs中对应的目录下

切换到logstash2.4版本后,研究了对应filter参数,最后的实际配置为:

input{

# stdin{

# codec=>rubydebug

# }

kafka {

zk_connect=>"SANDBOX-HADOOP-02.xuele.net"

topic_id=>"exam_log"

codec => plain

}

}

filter{

mutate{

split=>["message"," -- "]

add_field => {

"field2" => "%{[message][1]}"

}

}

json{

source => "field2"

add_field => {

"field3" => "%{data}"

}

}

json{

source => "field3"

}

mutate {

join => ["message", " -- "]

}

}

output{

webhdfs {

host => "SANDBOX-HADOOP-01.xuele.net"

port => 50070

user => "bigdata"

path => "/test/%{examId}/%{examSubmitTime}"

codec => line {

format => "%{message}"

}

}

# stdout{

# codec=>rubydebug

# }

}

在之上配置的基础上,还需要做出对应修改,例如日期格式需要通过date参数进行格式化

而且输出的codec中需要加上:

codec => line {
            format => "%{message}"
        }

不然会在落地的文件内容前加上默认的时间戳

另附logstash和kafka的版本对应信息:

 

以上是 logstash消费kafka数据,根据ID写入hdfs的对应路径下 的全部内容, 来源链接: utcz.com/z/517654.html

回到顶部