以下是实例
原始数据:
{"countnum":2,"checktime":"2017-05-23 16:59:32"}
{"countnum":2,"checktime":"2017-05-23 16:59:32"}
1、无涉及字段类型转换 logstash filter 配置如下参数即可
if [type] == "onlinecount" {
json{
source => "message"
}
}
2、涉及字段类型转换
logstash filter
if [type] == "onlinecount" {
mutate{
split=>["message",","]
add_field => {
"coutnum" => "%{[message][0]}"
}
add_field => {
"checktime" => "%{[message][1]}"
}
remove_field => ["message"]
}
json{
source => "coutnum"
source => "checktime"
#convert => { "coutnum" => "integer" }
target => "coutnum"
target => "checktime"
}
}
kafka数据:{
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}\n
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_33"}\n
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_34"}\n
....
}
kafka团队因考虑性能问题,将原始日志多条合并一条发送(每一条用换行符分割),这样我读的kafka就必须拆成一条一条的写入到ES,不然数据就不准确了,请问这种需求该如何处理呢?
已解决,开始走了弯路,用的下列方法导致还在一条数据
filter {
mutate {
split=>["message","
"]
}
正解方案
filter {
split {
field => "message"
}
还有一个小问题split中terminator默认是\n,但是我如下写法为什么切割不成功呢,不写terminator是可以的
filter {
split {
field => "message"
terminator => "\\n"
}
现有json:
{
"name":"zhangsan",
"friends":
{
"friend1":"lisi",
"friend2":"wangwu",
"msg":["haha","yaya"]
}
}
1
2
3
4
5
6
7
8
9
将其解析为:
{
"name":"zhangsan",
"friend1":"lisi",
"friend2":"wangwu",
"msg":["haha","yaya"]
}
1
2
3
4
5
6
logstash.conf
input
{
stdin
{
codec => json
}
}
filter
{
mutate
{
add_field => { "@friends" => "%{friends}" } #先新建一个新的字段,并将friends赋值给它
}
json
{
source => "@friends" #再进行解析
remove_field => [ "@alert","alert" ] #删除不必要的字段,也可以不用这语句
}
}
output
{
stdout { }
}
---------------------
作者:姚贤贤
来源:CSDN
原文:https://blog.csdn.net/u011311291/article/details/86743642
版权声明:本文为博主原创文章,转载请附上博文链接!