以下是实例

原始数据:

{"countnum":2,"checktime":"2017-05-23 16:59:32"}

{"countnum":2,"checktime":"2017-05-23 16:59:32"}  

 

 

1、无涉及字段类型转换   logstash filter  配置如下参数即可

if [type] == "onlinecount" {

       json{

    source => "message"

     }

  }

 

 

2、涉及字段类型转换

logstash filter  

 

if [type] == "onlinecount" {

mutate{

split=>["message",","]

add_field => {

"coutnum" => "%{[message][0]}"

}

add_field => {

"checktime" => "%{[message][1]}"

}

remove_field => ["message"]

}

json{

source => "coutnum"

source => "checktime"

#convert => { "coutnum" => "integer" }

target => "coutnum"

target => "checktime"

}

}

 

 


 

kafka数据:{
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}\n
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_33"}\n
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_34"}\n
....
}
 

kafka团队因考虑性能问题,将原始日志多条合并一条发送(每一条用换行符分割),这样我读的kafka就必须拆成一条一条的写入到ES,不然数据就不准确了,请问这种需求该如何处理呢?

已解决,开始走了弯路,用的下列方法导致还在一条数据
filter {
      mutate {
              split=>["message","
"]
      }


正解方案
filter {
        split {
                        field => "message"
               }


 
还有一个小问题split中terminator默认是\n,但是我如下写法为什么切割不成功呢,不写terminator是可以的
filter {
        split {
                        field => "message"
                        terminator => "\\n"
               }
 


 

现有json:

{
"name":"zhangsan",
"friends":
{
"friend1":"lisi",
"friend2":"wangwu",
"msg":["haha","yaya"]
}
}
1
2
3
4
5
6
7
8
9
将其解析为:

{
"name":"zhangsan",
"friend1":"lisi",
"friend2":"wangwu",
"msg":["haha","yaya"]
}
1
2
3
4
5
6
logstash.conf

input
{
stdin
{
codec => json
}
}

filter
{
mutate
{
add_field => { "@friends" => "%{friends}" } #先新建一个新的字段,并将friends赋值给它
}
json
{
source => "@friends" #再进行解析
remove_field => [ "@alert","alert" ] #删除不必要的字段,也可以不用这语句
}
}

output
{
stdout { }
}
---------------------
作者:姚贤贤
来源:CSDN
原文:https://blog.csdn.net/u011311291/article/details/86743642
版权声明:本文为博主原创文章,转载请附上博文链接!