东莞企业网站优化,嘉兴网页制作网站排名,线上营销是什么意思,绍兴以往网站招工做防止ES的的I/O的压力过大#xff0c;使用redis/kafka进行缓冲。
对redis的要求
Redis input plugin | Logstash Reference [8.17] | Elastic
一般企业要求的架构 我实现的架构 filebeat把数据传给logstash 配置好filebeat把收集到的数据输入到redis 然后执行命令#xff0…防止ES的的I/O的压力过大使用redis/kafka进行缓冲。
对redis的要求
Redis input plugin | Logstash Reference [8.17] | Elastic
一般企业要求的架构 我实现的架构 filebeat把数据传给logstash 配置好filebeat把收集到的数据输入到redis 然后执行命令filebeat就开始往redis中写数据 cd /etc/filebeat /usr/share/filebeat/bin/filebeat -e -c filebeat.yml logstash配置从redis中读数据输出到elasticsearch 当我使用filebeat收集数据传到redis然后logstash从redis读取数据的时候有一个大问题就是
filebeat收集的是json格式的nginx访问日志它被filebeat收集到redis的时候会对这个收集的日志和与日志相关的元数据进行封装然后输出到redis。等logstash从redis里面取出来的时候得到的数据是 收集的日志message) 和 元数据字段的集合。
(就变成了一个字段message,其他都是没用的元数据字段。)
我收集的nginx日志格式 要解决的问题
1.json格式的日志数据message未正确解析
这是在filebeat直接向logstash传送数据的时候发送的错误因为传送过去的数据没有设置正确的字符集导致收集到的日志乱码 kibana显示 改正后基本上就没有乱码格式了一个event是一条日志 2.删除无用的元数据字段
Filebeat直接连接logtsash的时候filebeat通过output.logstash发送数据默认不会添加完整元数据输出到redis的时候filebeat使用output.redis将日志包装成完整事件结构包含所有的元数据。
Redis 作为缓冲队列数据存储到 Redis 时Filebeat 会保留完整的 Beat 事件结构包括 metadata、host、agent 等。
所以当logstash从redis取日志数据的时候会收集无用的元数据(只有message是收集的访问日志我们要把无用的元数据过滤掉 然后会得到一整个大的message字段 3.过滤出日志里面的单个信息
所以删除无用的元数据之后如果我们不仅想得到访问日志还想得到访问日志里面得具体数据使用这个具体数据进行画图等等比如我们想得到remote_addr字段就需要对message进行解析
input {redis {host 10.8.0.23port 6379password Pu1uC2016db 0data_type listkey nginx-accesslogcodec json # 自动解析外层 JSON}
}
#nginx的原始的json格式的数据是这样的
#{time_local:2025-03-28T05:55:5708:00,remote_addr:45.90.163.37,remote_user:,request:PUT /v1/agent/service/register
#HTTP/1.1,status:400,body_bytes_sent:173,request_time:0.263,http_referer:,http_user_agent:,http_x_forwarded_for:,
#upstream_addr:,upstream_response_time:}#但是message里面的数据是这样的我们要把message中的数据转化为json格式然后json过滤才能把字段比如ip过滤出来然后可以对数据做图形分析
# message {\time_local\:\2025-03-28T18:12:2308:00\,\remote_addr\:\10.8.0.23\,\request\:\HEAD / HTTP/1.1\,\status\:\200\,
#\body_bytes_sent\:\0\,\request_time\:\0.000\,\http_referer\:\\,\http_user_agent\:\curl/7.61.1\,}#要把下面转化为上面的这样的json格式上面是标准的json格式filter里面的json工具才能把message这个大字段里面的小字段取出来
filter {# 第一步处理 message 字段中的内层 JSON mutate { gsub [message, \\\, \ # 将 \ 替换为普通引号] }mutate {# 删除末尾多余逗号如 ,} → }gsub [message, ,}, },message, ,], ]]}# 解析 message 字段中的业务日志 这个json过滤它只作用于json格式的数据json {source messagetarget nginx_log # 解析结果存入 nginx_log 子对象remove_field [message]tag_on_failure [_json_parse_failure] # 标记解析失败的日志}# 第二步提升业务字段到根层级 mutate {rename {[nginx_log][time_local] time_local[nginx_log][remote_addr] remote_addr[nginx_log][request] request[nginx_log][status] status[nginx_log][body_bytes_sent] body_bytes_sent[nginx_log][http_referer] http_referer[nginx_log][http_user_agent] http_user_agent}}#把一些无用的元数据过滤掉不要# 第三步清理所有元数据字段 mutate {remove_field [host, agent, ecs, log, input, fields,version, event, nginx_log]}# 第四步修正时间戳 date {match [ time_local, ISO8601 ]target timestamp # 覆盖默认时间戳}
}
output {elasticsearch {hosts [https://10.8.0.23:9200]index dami-logs-%{YYYY.MM.dd}user elasticpassword sxm325468ssl truecacert /logstash/http_ca.crt }# 调试时开启 stdout查看完整字段stdout {codec rubydebug {metadata true # 显示元数据确认字段结构}}
}
最后可以得到message里面的单个小字段信息以前只能在message中进行查看不能对数据进行分析和画图现在可以了 filter过滤器的其他用法
Filter plugins | Logstash Reference [8.17] | Elastic
1.grok
也可以康康这个人的文章
logstash过滤器插件filter详解及实例 - 峰哥ge - 博客园
官方文档
Grok filter plugin | Logstash Reference [8.17] | Elastic
grok %{语法:语义}
#语法在配置文件里已经定义好了
#语义是自己定义的表示要将获得的字段放在哪个key里例如下面ip就是key值取出的字段值是value match {message %{IPV4:ip}} #message是收集到的每一条数据 /usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-patterns-core-4.3.4/patterns/ecs-v1/grok-patterns #语法在配置文件里已经定义好了 match {message %{IPV4:ip}} #message是收集到的每一条数据 match {message %{HTTPDATE:time}} # match {message %{LOGLEVEL:level}} # 2.groip 通过ip定位物理位置
Geoip 过滤器插件 |Logstash 参考 [8.17] |弹性的
logstash上要安装这个数据库要先注册才能看
Download GeoIP Databases | MaxMind 我的geoip配置写错了但是我懒得改了我的阿里云服务器要释放了。 logstash收集到的数据还可以存放到数据库中然后可以自己公司开发一个前端工具连接到数据库进行数据分析