Elastic stack 是指由Elasticsearch、Logstash、Kibana和Kafka、Filebeat组成的日志分析平台,即传统的ELK加上Kafka和Filebeat,可以将系统、网站、应用等日志进行收集、过滤、清洗,然后进行集中存放,最终用于实时检索和分析。
本文介绍如何部署Logstash集群。
点击此处访问Logstash下载地址,本文以7.6.2版本为例。
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.6.2.tar.gz tar zxvf logstash-7.6.2.tar.gz cd logstash-7.6.2
配置logstash
编辑自定义logstash的配置文件,在启动应用时需要指定对应的配置文件。
从kafka输入,并输出至elasticsearch,修改配置config/logstash.conf:
input { kafka { type => "Type_Name_1" bootstrap_servers => ["KAFKA1:9092, KAFKA2:9092, KAFKA3:9092"] topics => ["Topic_Name_1","Topic_Name_2","Topic_Name_3"] consumer_threads => 3 codec => "json" } kafka { type => "Type_Name_2" bootstrap_servers => "KAFKA1:9092, KAFKA2:9092, KAFKA3:9092" topics => ["Topic_Name"] group_id => "asiainfo" consumer_threads => 1 codec => "json" } } #filter { # if [type] == "Type_Name_1" { # grok { # match => [ # "message","%{HTTPDATE:timestamp}", # "message","%{COMBINEDAPACHELOG}" # ] # } # date { # match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] # } # } # if [type] == "Type_Name_2" { # grok { # match => [ # "message","%{HTTPDATE:timestamp}", # ] # } # } #} output { if [type] == "Type_Name_1" { elasticsearch { hosts => ["ES1:9200","ES2:9200","ES3:9200"] index => "Type_Name_1-log-%{+YYYY.MM.dd}" } } if [type] == "Type_Name_2" { elasticsearch { hosts => ["ES1:9200","ES2:9200","ES3:9200"] index => "Type_Name_2-log-%{+YYYY.MM.dd}" timeout => 300 } } }
参数说明:
Input :指定来源,主机IP和端口、主题、模式。
Filter :使用grok插件通过正则解析文本,将日志数据结构化。
Output :指定输出,index对应ES里的检索,一般以“topic+日期”即可。
复杂的日志系统往往需要进行特殊处理,如异常堆栈需要合并行、控制台调试等。比如对异常堆栈合并行操作,加上如下代码(也可以在filebeat中操作):
input { stdin { codec => multiline { pattern => "^[" negate => true what => "previous" } } }
使用控制台调试过滤器来校验正则效果,count指定重复生成的次数,message是待调试的内容:
input { generator { count => 1 message => {"key1":"value1","key2":[1,2],"key3":{"subkey1":"subvalue1"}}' codec => json } }
指定输出到控制台:
output { stdout { codec => rubydebug } }
启动logstash
./bin/logstash -f ./config/logstash.conf &
原创文章禁止转载:技术学堂 » Elastic.stack集群部署课程四·Logstash集群