之前有做过一篇关于ELK日志收集分析平台的博文,而这片博文这是一个后续的版本,在这里我将通过五个案例向大家详细的讲述Logstash集中、转换和存储数据的过程和相关配置使用,Logstash它的强大之处在于他可以通过非常多的方式去进行转换日志格式然后进行存储,而支持存储的方式也非常的多,但这里主要结合Elasticsearc进行。


输入

Logstash的输入主要是Input插件进行工作,但是Input可以进行标准输入(Stdin)、读取文件(File)、读取网络数据(TCP)、生成测试数据(Generator)、读取Syslog数据、读取Redis数据、读取collectd数据,当然这里的部分功能需要部分依赖关系,这里就不做详细说明,如果真的遇到可以在下方评论区进行留言或者Goolge一下就好了,主要都是一下yum安装的程序,下面给大家演示一下标准输入和文件输入
标准输入

input {
    stdin {
        add_field => {"key" => "value"}
        codec => "plain"
        tags => ["add"]
        type => "std"
    }
}

我们使用bin/logstash -f stdin.conf命令就会输出一下的信息

{
       "message" => "hello world",
      "@version" => "1",
    "@timestamp" => "2014-08-08T06:48:47.789Z",
          "type" => "std",
          "tags" => [
        [0] "add"
    ],
           "key" => "value",
          "host" => "raochenlindeMacBook-Air.local"
}


文件输入
分析日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用 logstash 来处理日志文件。
Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心 logstash 会漏过你的数据。
sincedb 文件中记录了每个被监听的文件的 inode, major number, minor number 和 pos。
这里就以生产环境下的Elasticsearc Java日志作为实例讲解,首先我们展示配置文件进行分析学习

input {

    file {
        path => "/var/log/elasticsearch/KJ-Cloud.log"
        type => "es-log"
        start_position => "beginning"
    codec => multiline {
        pattern => "^\["
        negate => true
        what => "previous"
    }
    }

}

output {

    if [type] == "es-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-es-log-%{+YYYY.MM}"
        }
    }

}
  • path:
    指定读取的日志文件路径,FileWatch 只支持文件的绝对路径,而且会不自动递归目录,所以有需要的话,请用数组方式都写明具体哪些文件。由于FileWatch不支持fluentd 那样的path => "/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path => "/path/to////.log"
  • type:
    作为日志归纳字段使用,后面也可以作为变量去进行一个判断
  • start_position:
    logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,有点类似 cat,但是读到最后一行不会终止,而是继续变成 tail -F
  • codec:
    codec是一个编解码插件,他可以将多行数据进行合并,也可以采用Josn解码,而这里的codec向大家展示了如何将多行合并成一行,pattern则是指定了它的分隔符
  • discover_interval:
    logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
  • exclude:
    不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
  • sincedb_path:
    如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile.sincedb),可以通过这个配置定义 sincedb 文件到其他位置。
  • sincedb_write_interval:
    logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
  • stat_interval:
    logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。

输出的格式如下:

{
    "_index": "logstash-es-log-2018.09",
    "_type": "doc",
    "_id": "gz9oDWYBWCMetPHc09lE",
    "_version": 1,
    "_score": 1,
    "_source": {
        "host": "test.kemin-cloud.com",
        "@version": "1",
        "type": "es-log",
        "message": "[2018-09-25T05:07:44,495][WARN ][o.e.m.j.JvmGcMonitorService] [center.kemin-cloud.com] [gc][young][22469][1987] duration [2.8s], collections [1]/[3s], total [2.8s]/[1.8m], memory [346.7mb]->[228.5mb]/[1007.3mb], all_pools {[young] [123.9mb]->[387.1kb]/[133.1mb]}{[survivor] [15mb]->[16.6mb]/[16.6mb]}{[old] [207.7mb]->[211.5mb]/[857.6mb]}",
        "path": "/var/log/elasticsearch/KJ-Cloud.log",
        "@timestamp": "2018-09-24T21:07:45.494Z"
    }
}

相信大家到这里还是有点晕,那我们再哪一个比较常用的生产实例给大家说明,那就是Nginx的访问日志,但是这里需要做些小变动,由于我们平常存储Nginx的日志格式并不会很好的让logstash进行收集,所以我们需要修改nginx的日志格式存储为josn格式

log_format  json  '{"@timestamp":"$time_iso8601",'
                   '"@version":"1",'
                   '"host":"$server_addr",'
                   '"agent":"$remote_addr",'
                   '"client":"$http_x_forwarded_for",'
                   '"size":$body_bytes_sent,'
                   '"responsetime":$request_time,'
                   '"domain":"$host",'
                   '"url":"$uri",'
                   '"status":"$status"}';

access_log  /var/log/nginx/access_json.log  json;

重启nginx服务,那么nginx的日志格式将会变成json格式,那么现在再来编写logstash配置文件

input {

    file {
        path => "/var/log/nginx/access_json.log"
        type => "nx-ac-log"
        start_position => "beginning"
        codec => json
    }

}



filter {

    if [type] == "nx-ac-log" {
        geoip {
            source => "client"
            target => "geoip"
            database => "/etc/logstash/conf.d/GeoLiteCity.dat"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
        }

        mutate {
            convert => [ "[geoip][coordinates]", "float" ]
        }
    }

}



output {

    if [type] == "nx-ac-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            manage_template => true
            index => "logstash-nx-ac-log-%{+YYYY.MM}"
        }
    }

}

这里把codec的指向值变成了json,则表示按照json的日志格式去读取,而中间的添加了一个filter插件,这里是在原本的基础上做了一些小小的改变,那就是根据客户的IP地址进行经纬度定位,这样子就可以在Kibana上面进行区域性分析,接下来就会和大家好好的解析filter插件
输出格式如下:

{
    "_index": "logstash-nx-ac-log-2018.09",
    "_type": "doc",
    "_id": "3yCMA2YBP74d8uJuELhF",
    "_version": 1,
    "_score": 1,
    "_source": {
        "agent": "172.31.243.145",
        "geoip": {
            "continent_code": "AS",
            "ip": "162.35.242.31",
            "location": {
                "lat": 22.25,
                "lon": 114.1667
            },
            "country_code2": "HK",
            "country_code3": "HK",
            "longitude": 114.1667,
            "coordinates": [
                114.1667
                ,
                22.25
            ],
            "country_name": "Hong Kong",
            "timezone": "Asia/Hong_Kong",
            "latitude": 22.25
        },
        "status": "200",
        "client": "47.52.16.198",
        "size": 12782,
        "responsetime": 0.135,
        "@timestamp": "2018-09-22T23:10:02.000Z",
        "url": "/zabbix/zabbix.php",
        "type": "nx-ac-log",
        "@version": "1",
        "domain": "test.kemin-cloud.com",
        "host": "172.31.243.146",
        "path": "/var/log/nginx/access_json.log"
    }
}

过滤器

丰富的过滤器插件的存在是 logstash 威力如此强大的重要因素,但是称他为过滤器,其实提供的不单单是过滤的功能(小编这里觉得这个命名实在太随意了应该起一个更加爆炸性的名字),他提供正则捕获(Grok)、时间处理(Date)、数据修改(Mutate)、地址查询归类(GeoIP)、编解码(json)、拆分事件(split)、匹配归类(UserAgent)、切分(Key-Value)、数值统计(Metrics)还有随心所欲的Ruby处理(实在是强大到没有朋友,应了那句歌词,无敌是多么,多么寂寞),下面具体会以几个实例进行解析,还有的大家可以访问https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html进行详细的了解,当然也可以访问官网了解(小编是为菜鸟所以面对官网专业性的英文只能默默流泪。。。。)
首先把我们上面Nginx访问日志使用的地址查询归类讲解讲解

filter {

    if [type] == "nx-ac-log" {
        geoip {
            source => "client"
            target => "geoip"
            database => "/etc/logstash/conf.d/GeoLiteCity.dat"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
        }

        mutate {
            convert => [ "[geoip][coordinates]", "float" ]
        }
    }

}


GeoIP 库数据较多,如果你不需要这么多内容,可以通过add_field选项指定自己所需要的。下例为全部可选内容:

filter {
    geoip {
        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
    }
}

需要注意的是:geoip.location 是 logstash 通过 latitude 和 longitude额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,需要进行对应修改,同时geoip 插件的 "source" 字段可以是任一处理后的字段,比如 "client_ip",但是字段内容却需要小心!geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段。当然先使用从第三方购买回来的IP地址库也可以使用database进行指定位置(但是这个测试好像不行,有待验证)
我们上面进行完IP的归纳,所以我们后面紧跟着就要用到Mutate进行数据修改,Mutate的修改可以对类型进行转换、字符串处理、字段处理,而我们这里需要使用它的类型转换,但这里需要注意一点就是mutate除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"] 转换成 [1,2]。但不支持对哈希类型的字段做类似处理,有这方面需求的可以采用稍后讲述的 filters/ruby 插件完成。

mutate {
    convert => [ "[geoip][coordinates]", "float" ]
}

把之前的分析完之后,现在我们来进行一个复杂点的来分析,那就是Nginx的错误日志。

input {

    file {
        path => "/var/log/nginx/error.log"
        type => "nx-er-log"
        start_position => "beginning"
    }

}



filter {

    if [type] == "nx-er-log" {
        grok {
            match => { "message" => "(?<datetime>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:err_message}(?:, client: (?<client>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:domain}?)(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:referrer}\")?"}
        }

    }

}



output {

    if [type] == "nx-er-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-nx-er-log-%{+YYYY.MM}"
        }
    }

}

由于Nginx的错误日志不像Nginx的访问日志一样可以修改为json格式,所以这里需要使用到grok正则表达式进行过滤匹配,官方提供的预定义 grok 表达式见:https://github.com/logstash/logstash/tree/v1.4.2/patterns
输出格式如下:

{
    "_index": "logstash-nx-er-log-2018.09",
    "_type": "doc",
    "_id": "TSDoA2YBP74d8uJudupM",
    "_version": 1,
    "_score": 1,
    "_source": {
        "@version": "1",
        "host": "hkagent.kemin-cloud.com",
        "type": "nx-er-log",
        "pid": "2525",
        "severity": "error",
        "request_host": ""st.kemin-cloud.com"",
        "path": "/var/log/nginx/error.log",
        "referrer": "http://www.speedtest.cn/",
        "message": "2018/09/23 08:50:57 [error] 2525#0: *72394 client intended to send too large body: 1049352 bytes, client: 113.88.103.126, server: st.kemin-cloud.com, request: "POST /index.php?r=0.08330938681162791 HTTP/1.1", host: "st.kemin-cloud.com", referrer: "http://www.speedtest.cn/"",
        "datetime": "2018/09/23 08:50:57",
        "domain": "st.kemin-cloud.com",
        "@timestamp": "2018-09-23T00:50:58.056Z",
        "err_message": "*72394 client intended to send too large body: 1049352 bytes",
        "request": ""POST /index.php?r=0.08330938681162791 HTTP/1.1"",
        "client": "113.88.103.126"
    }
}

接下来这里再给大家提供利用Grok正则匹配的一个实际生产环境,收集MySQL慢查询日志

input {

    file {
        type => "mysqlslow-log"
        path => "/var/log/mysql/mysql-slow.log"
        start_position => "beginning"
        codec => multiline {
            pattern => "^# User@Host:"
            negate => true
            what => "previous"
        }
    }

}



filter {

    if [type] == "mysqlslow-log" {
        grok {
            match => { "message" => "SELECT SLEEP" }
            add_tag => [ "sleep_drop" ]
            tag_on_failure => []
        }

        if "sleep_drop" in [tags] {
            drop {}
        }

        grok {
            match => { "message" => "(?m)^# User@Host: %{USER:User}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:Client_IP})?\]\s.*# Query_time: %{NUMBER:Query_Time:float}\s+Lock_time: %{NUMBER:Lock_Time:float}\s+Rows_sent: %{NUMBER:Rows_Sent:int}\s+Rows_examined: %{NUMBER:Rows_Examined:int}\s*(?:use %{DATA:Database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<Query>(?<Action>\w+)\s+.*)\n# Time:.*$" }
        }

        date {
            match => [ "timestamp","UNIX" ]
            remove_field => [ "timestamp" ]
        }

    }

}



output {

    if [type] == "mysqlslow-log" {
       elasticsearch {
           hosts => ["172.31.243.146:9200"]
           index => "logstash-mysqlslow-log-%{+YYYY.MM}"
       }
   }

}

MySQL的慢查询日志收集需要mysql开启慢查询记录,正则匹配过滤的时候比较复杂需要大家细心的去对照具体日志记录进行分析,这里还使用到了时间处理filters/date 插件可以用来转换你的日志记录中的时间字符串,这里需要大家注意一点就是在稍后的outputs/elasticsearch中常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用filters/date转换后删除自己的字段!这在导入旧数据的时候固然非常有用,而在实时数据处理的时候同样有效,因为一般情况下数据流程中我们都会有缓冲区,导致最终的实际处理时间跟事件产生时间略有偏差。

filters/date 插件支持五种时间格式:

  1. ISO8601:类似 "2011-04-19T03:44:01.103Z" 这样的格式。具体Z后面可以有 "08:00"也可以没有,".103"这个也可以没有。常用场景里来说,Nginx 的 log_format 配置里就可以使用 $time_iso8601 变量来记录请求时间成这种格式。
  2. UNIX:UNIX 时间戳格式,记录的是从 1970 年起始至今的总秒数。Squid 的默认日志格式中就使用了这种格式。
  3. UNIX_MS:这个时间戳则是从 1970 年起始至今的总毫秒数。据我所知,JavaScript 里经常使用这个时间格式。
  4. TAI64N:TAI64N 格式比较少见,是这个样子的:@4000000052f88ea32489532c。我目前只知道常见应用中,qmail会用这个格式。
  5. Joda-Time库:Logstash 内部使用了 Java 的 Joda 时间库来作时间处理。所以我们可以使用 Joda 库所支持的时间格式来作具体定义。具体对照表这里就不粘出来了,需要的下方留言我在粘吧~

输出格式如下:

{
    "_index": "logstash-mysqlslow-log-2018.10",
    "_type": "doc",
    "_id": "1RGBk2YBAXDt9GzpgK-P",
    "_version": 1,
    "_score": 1,
    "_source": {
        "@version": "1",
        "type": "mysqlslow-log",
        "Lock_Time": 0.000037,
        "@timestamp": "2018-10-18T16:00:23.000Z",
        "Rows_Sent": 0,
        "Query_Time": 1.058855,
        "User": "zabbix",
        "message": "# User@Host: zabbix[zabbix] @ [172.31.243.146] # Query_time: 1.058855 Lock_time: 0.000037 Rows_sent: 0 Rows_examined: 726 SET     timestamp=1539878423; delete from history_uint where itemid=119 and clock<1537286352; # Time: 181019 0:00:25",
        "path": "/var/log/mysql/mysql-slow.log",
        "Rows_Examined": 726,
        "host": "0.0.0.0",
        "Action": "delete",
        "Query": "delete from history_uint where itemid=119 and clock<1537286352;",
        "Client_IP": "172.31.243.146",
        "tags": [
            "multiline"
        ]
    }
}

输出

Logstash 早期有三个不同的 elasticsearch 插件。到 1.4.0 版本的时候,开发者彻底重写了 LogStash::Outputs::Elasticsearch 插件。从此,我们只需要用这一个插件,就能任意切换使用 Elasticsearch 集群支持的各种不同协议了。下面我们来看看它的样式

output {
    elasticsearch {
        host => "192.168.0.2"
        protocol => "http"
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        index_type => "%{type}"
        workers => 5
        template_overwrite => true
    }
}

这里的模板只是显示了一小部分功能,输出的优化其实还有很多的,Logstash1.4.2在transport和http协议的情况下是固定连接指定host发送数据。从1.5.0开始,host可以设置数组,它会从节点列表中选取不同的节点发送数据,达到Round-Robin负载均衡的效果,还有就是对性能要求不高的,可以在启动logstash进程时,使用命令export BULK="esruby"配置环境变量ENV["BULK"],强制采用elasticsearch官方Ruby库而对性能要求极高的,可以手动更新ftw库版本,目前最新版是0.0.42版,据称内存问题在0.0.40版即解决。

最后总结

实际上logstash还有很多很多东西都略过了,比如说input当中的syslog还没有提及,过滤插件里的好几个插件都没有说明展示案例,输出的部分还有许多方式比如发送邮件,输出到redis当中这些等等,如果有兴趣的话大家可以在官网上自主查阅,ELK的整个架构说简陋就那几部分,但是每一个部分都可以说上一千零一夜,所以可想而知是多的么的强大,在深入了解,还可以写出属于自己的定制插件,同时还有尚未进入官方库的插件,包括kafka(卡夫卡)这些比较耳熟的东西。

附上完整版配置:

input {

    #System Messages Logs
    syslog {
        type => "sys-log"
        port => "514"
    }

    #Elasticsearch Messages Logs
    file {
        path => "/var/log/elasticsearch/KJ-Cloud.log"
        type => "es-log"
        start_position => "beginning"
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }

    #Nginx Access Logs
    file {
        path => "/var/log/nginx/access_json.log"
        type => "nx-ac-log"
        start_position => "beginning"
        codec => json
    }
    

    #Nginx Error Logs
    file {
        path => "/var/log/nginx/error.log"
        type => "nx-er-log"
        start_position => "beginning"
    }

    #MySQL Slow Logs
    file {
        type => "mysqlslow-log"
        path => "/var/log/mysql/mysql-slow.log"
        start_position => "beginning"
        codec => multiline {
            pattern => "^# User@Host:"
            negate => true
            what => "previous"
        }
    }

}



filter {

    if [type] == "nx-er-log" {
        grok {
            match => { "message" => "(?<datetime>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:err_message}(?:, client: (?<client>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:domain}?)(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:referrer}\")?"}
        }

    }

    if [type] == "nx-ac-log" {
        geoip {
            source => "client"
            target => "geoip"
            #database => "/etc/logstash/conf.d/GeoLiteCity.dat"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
        }

        mutate {
            convert => [ "[geoip][coordinates]", "float" ]
        }
    }

    if [type] == "mysqlslow-log" {
        grok {
            match => { "message" => "SELECT SLEEP" }
            add_tag => [ "sleep_drop" ]
            tag_on_failure => []
        }

        if "sleep_drop" in [tags] {
            drop {}
        }

        grok {
            match => { "message" => "(?m)^# User@Host: %{USER:User}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:Client_IP})?\]\s.*# Query_time: %{NUMBER:Query_Time:float}\s+Lock_time: %{NUMBER:Lock_Time:float}\s+Rows_sent: %{NUMBER:Rows_Sent:int}\s+Rows_examined: %{NUMBER:Rows_Examined:int}\s*(?:use %{DATA:Database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<Query>(?<Action>\w+)\s+.*)\n# Time:.*$" }
        }

        date {
            match => [ "timestamp","UNIX" ]
            remove_field => [ "timestamp" ]
        }

    }

}



output {
    
    if [type] == "sys-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-sys-log-%{+YYYY.MM}"
        }
    }

    if [type] == "es-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-es-log-%{+YYYY.MM}"
        }
    }

    if [type] == "nx-ac-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            manage_template => true
            index => "logstash-nx-ac-log-%{+YYYY.MM}"
        }
    }

    if [type] == "nx-er-log" {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "logstash-nx-er-log-%{+YYYY.MM}"
        }
    }

    if [type] == "mysqlslow-log" {
       elasticsearch {
           hosts => ["172.31.243.146:9200"]
           index => "logstash-mysqlslow-log-%{+YYYY.MM}"
       }
   }

}

如果当中有什么不对的地方或者看不懂的地方欢迎在下方留言指出,我会及时修正和回复~

最后修改:2019 年 05 月 19 日
-