通过filebeat采集nginx日志的两种常用方案:方案一是由Filebeat采集输出至Logstash,方案二是由Filebeat采集先输出至Kafka,随后由Logstash从Kafka中读取。
配置Nginx
在Nginx配置中加入如下参数,实现输出json格式的日志,修改配置/etc/nginx/nginx.conf:
... log_format log_json '{ "@timestamp": "$time_local", ' '"remote_addr": "$remote_addr", ' '"referer": "$http_referer", ' '"request": "$request", ' '"status": $status, ' '"bytes": $body_bytes_sent, ' '"agent": "$http_user_agent", ' '"x_forwarded": "$http_x_forwarded_for", ' '"up_addr": "$upstream_addr",' '"up_host": "$upstream_http_host",' '"up_resp_time": "$upstream_response_time",' '"request_time": "$request_time"' ' }'; access_log /var/log/nginx/access_json.log log_json; ...
nginx -s reload # systemctl reload nginx
方案一:Filebeat→Logstash→Elasticsearch
配置Filebeat
采集nginx日志,并输出至logstash:
... filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access_json.log tags: ["nginx"] output.logstash: hosts: ["10.10.200.200:5044"] ...
配置Logstash
将采集到的日志输出至elasticsearch:
input { beats { port => 5044 codec => "json" } } output { if "nginx" in [tags] { elasticsearch { hosts => ["10.10.200.200:9200"] index => "nginx_%{+YYYY.MM.dd}" template_overwrite => true } } }
方案二:Filebeat→Kafka→Logstash→Elasticsearch
这种情况一般都是比较大型的集群模式了,Filebeat先将日志采集至kafka缓存,然后logstash从kafka读取日志。
配置Filebeat
filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access_json.log output.kafka: hosts: ["10.10.200.201:9092","10.10.200.202:9092","10.10.200.203:9092"] topic: 'nginx' partition.round_robin: reachable_only: true required_acks: 1
配置Logstash
input { kafka { type => "nginx" bootstrap_servers => ["KAFKA1:9092, KAFKA2:9092, KAFKA3:9092"] topics => ["nginx"] consumer_threads => 3 codec => "json" } } output { if [type] == "nginx" { elasticsearch { hosts => ["ES1:9200","ES2:9200","ES3:9200"] index => "nginx-log-%{+YYYY.MM.dd}" } } }
原创文章禁止转载:技术学堂 » ELK通过Filebeat采集Nginx日志