目 录CONTENT

文章目录

ELK日志分析系统

简中仙
2020-03-02 / 0 评论 / 0 点赞 / 90 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于2024-05-06,若内容或图片失效,请留言反馈。 本文如有错误或者侵权的地方,欢迎您批评指正!
软件说明参考文档
Elasticsearch数据库,存储数据官方参考文档
logstash日志收集,过滤数据官方参考文档
kibana分析,过滤,展示官方参考文档
filebeat收集日志,传输到ES或logstash官方参考文档

一、部署Elasticsearch集群

1、rpm安装

1、安装 Elasticsearch 软件

# mkdir -p /data/soft
# cd /data/soft/
# wget https://mirrors.tuna.tsinghua.edu.cn/elasticstack/8.x/yum/8.6.1/elasticsearch-8.6.1-x86_64.rpm
# wget https://mirrors.tuna.tsinghua.edu.cn/elasticstack/8.x/yum/8.6.1/filebeat-8.6.1-x86_64.rpm
# wget https://mirrors.tuna.tsinghua.edu.cn/elasticstack/8.x/yum/8.6.1/kibana-8.6.1-x86_64.rpm
# wget https://mirrors.tuna.tsinghua.edu.cn/elasticstack/8.x/yum/8.6.1/logstash-8.6.1-x86_64.rpm
# yum -y install elasticsearch-8.6.1-x86_64.rpm
# vim /etc/elasticsearch/elasticsearch.yml
node.name: node-1             											# 群集中本机节点名
path.data: /data/elasticsearch             								# 数据目录
path.logs: /var/log/elasticsearch             							# 日志目录
bootstrap.memory_lock: true             								# 锁定内存
network.host: 192.168.1.10,127.0.0.1             						# 监听的ip地址
http.port: 9200             											# 端口号
# mkdir -p /data/elasticsearch              							# 创建数据目录
# chown -R elasticsearch.elasticsearch /data/elasticsearch/             # 修改权限
# vim /etc/elasticsearch/jvm.options             						# 分配锁定内存
-Xms1g             														# 分配最小内存	
-Xmx1g             														# 分配最大内存,官方推荐为物理内存的一半,但最大为32G
# systemctl edit elasticsearch            							    # 修改锁定内存后,无法重启,解决方法
[Service]
LimitMEMLOCK=infinity             										# F2保存退出
# systemctl daemon-reload
# systemctl restart elasticsearch

2、添加elasticsearch群集

# vim /etc/elasticsearch/elasticsearch.yml
node.name: node-1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.1.20,127.0.0.1
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.1.10", "192.168.1.20"]          # 主节点,工作节点
discovery.zen.minimum_master_nodes: 2             							# 添加的值=节点数/2 + 1

3、常见群集管理监控命令

# curl -XPUT '192.168.1.10:9200/vipinfo/users/1?pretty&pretty' -H 'Content-Type: application/json' -d '{"name": "guofucheng","age": "45","job": "mingxing"}'             					# 创建索引
# curl -XGET '192.168.1.10:9200/_cat/indices?pretty'             			# 查看索引信息
# curl -XGET '192.168.1.10:9200/_cluster/health?pretty'             		# 查看群集健康状态
# curl -XGET '192.168.1.10:9200/_cat/nodes?human&pretty'             		# 统计群集节点
# curl -XGET '192.168.1.10:9200/_nodes/_all/info/jvm.process?human&pretty'  # 查看群集所有节点详细信息

2、Docker Compose

镜像信息

配置基础环境

# 内核vm.max_map_count设置必须至少设置262144为用于生产
# vim /etc/sysctl.conf
vm.max_map_count=262144
# sysctl -p

确保为 Docker 分配至少 4GiB 的内存。

# vim .env
# 'elastic' 用户的密码(至少 6 个字符)
ELASTIC_PASSWORD=

# 'kibana_system' 用户的密码(至少 6 个字符)
KIBANA_PASSWORD=

# Elastic 产品的版本
STACK_VERSION=8.6.1

# 设置集群名称
CLUSTER_NAME=docker-cluster

# 设置为 'basic' 或 'trial' 自动开始 30 天试用
LICENSE=basic
#LICENSE=trial

# 将 Elasticsearch HTTP API 暴露给主机的端口
ES_PORT=9200
#ES_PORT=127.0.0.1:9200

# 将 Kibana 暴露给主机的端口
KIBANA_PORT=5601
#KIBANA_PORT=80

# 根据可用的主机内存增加或减少(以字节为单位)
MEM_LIMIT=1073741824

# 项目命名空间(如果未设置则默认为当前文件夹名称)#COMPOSE_PROJECT_NAME=myproject

配置docker-compose

version: "2.2"

services:
  setup:
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
    user: "0"
    command: >
      bash -c '
        if [ x${ELASTIC_PASSWORD} == x ]; then
          echo "Set the ELASTIC_PASSWORD environment variable in the .env file";
          exit 1;
        elif [ x${KIBANA_PASSWORD} == x ]; then
          echo "Set the KIBANA_PASSWORD environment variable in the .env file";
          exit 1;
        fi;
        if [ ! -f config/certs/ca.zip ]; then
          echo "Creating CA";
          bin/elasticsearch-certutil ca --silent --pem -out config/certs/ca.zip;
          unzip config/certs/ca.zip -d config/certs;
        fi;
        if [ ! -f config/certs/certs.zip ]; then
          echo "Creating certs";
          echo -ne \
          "instances:\n"\
          "  - name: es01\n"\
          "    dns:\n"\
          "      - es01\n"\
          "      - localhost\n"\
          "    ip:\n"\
          "      - 127.0.0.1\n"\
          "  - name: es02\n"\
          "    dns:\n"\
          "      - es02\n"\
          "      - localhost\n"\
          "    ip:\n"\
          "      - 127.0.0.1\n"\
          "  - name: es03\n"\
          "    dns:\n"\
          "      - es03\n"\
          "      - localhost\n"\
          "    ip:\n"\
          "      - 127.0.0.1\n"\
          > config/certs/instances.yml;
          bin/elasticsearch-certutil cert --silent --pem -out config/certs/certs.zip --in config/certs/instances.yml --ca-cert config/certs/ca/ca.crt --ca-key config/certs/ca/ca.key;
          unzip config/certs/certs.zip -d config/certs;
        fi;
        echo "Setting file permissions"
        chown -R root:root config/certs;
        find . -type d -exec chmod 750 \{\} \;;
        find . -type f -exec chmod 640 \{\} \;;
        echo "Waiting for Elasticsearch availability";
        until curl -s --cacert config/certs/ca/ca.crt https://es01:9200 | grep -q "missing authentication credentials"; do sleep 30; done;
        echo "Setting kibana_system password";
        until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://es01:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done;
        echo "All done!";
      '
    healthcheck:
      test: ["CMD-SHELL", "[ -f config/certs/es01/es01.crt ]"]
      interval: 1s
      timeout: 5s
      retries: 120

  es01:
    depends_on:
      setup:
        condition: service_healthy
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
      - esdata01:/usr/share/elasticsearch/data
    ports:
      - ${ES_PORT}:9200
    environment:
      - node.name=es01
      - cluster.name=${CLUSTER_NAME}
      - cluster.initial_master_nodes=es01,es02,es03
      - discovery.seed_hosts=es02,es03
      - ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
      - bootstrap.memory_lock=true
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true
      - xpack.security.http.ssl.key=certs/es01/es01.key
      - xpack.security.http.ssl.certificate=certs/es01/es01.crt
      - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.enabled=true
      - xpack.security.transport.ssl.key=certs/es01/es01.key
      - xpack.security.transport.ssl.certificate=certs/es01/es01.crt
      - xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.verification_mode=certificate
      - xpack.license.self_generated.type=${LICENSE}
      - TZ=Asia/Shanghai
    mem_limit: ${MEM_LIMIT}
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120

  es02:
    depends_on:
      - es01
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
      - esdata02:/usr/share/elasticsearch/data
    environment:
      - node.name=es02
      - cluster.name=${CLUSTER_NAME}
      - cluster.initial_master_nodes=es01,es02,es03
      - discovery.seed_hosts=es01,es03
      - bootstrap.memory_lock=true
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true
      - xpack.security.http.ssl.key=certs/es02/es02.key
      - xpack.security.http.ssl.certificate=certs/es02/es02.crt
      - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.enabled=true
      - xpack.security.transport.ssl.key=certs/es02/es02.key
      - xpack.security.transport.ssl.certificate=certs/es02/es02.crt
      - xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.verification_mode=certificate
      - xpack.license.self_generated.type=${LICENSE}
      - TZ=Asia/Shanghai
    mem_limit: ${MEM_LIMIT}
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120

  es03:
    depends_on:
      - es02
    image: docker.elastic.co/elasticsearch/elasticsearch:${STACK_VERSION}
    volumes:
      - certs:/usr/share/elasticsearch/config/certs
      - esdata03:/usr/share/elasticsearch/data
    environment:
      - node.name=es03
      - cluster.name=${CLUSTER_NAME}
      - cluster.initial_master_nodes=es01,es02,es03
      - discovery.seed_hosts=es01,es02
      - bootstrap.memory_lock=true
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true
      - xpack.security.http.ssl.key=certs/es03/es03.key
      - xpack.security.http.ssl.certificate=certs/es03/es03.crt
      - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.enabled=true
      - xpack.security.transport.ssl.key=certs/es03/es03.key
      - xpack.security.transport.ssl.certificate=certs/es03/es03.crt
      - xpack.security.transport.ssl.certificate_authorities=certs/ca/ca.crt
      - xpack.security.transport.ssl.verification_mode=certificate
      - xpack.license.self_generated.type=${LICENSE}
      - TZ=Asia/Shanghai
    mem_limit: ${MEM_LIMIT}
    ulimits:
      memlock:
        soft: -1
        hard: -1
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120

  kibana:
    depends_on:
      es01:
        condition: service_healthy
      es02:
        condition: service_healthy
      es03:
        condition: service_healthy
    image: docker.elastic.co/kibana/kibana:${STACK_VERSION}
    volumes:
      - certs:/usr/share/kibana/config/certs
      - kibanadata:/usr/share/kibana/data
    ports:
      - ${KIBANA_PORT}:5601
    environment:
      - SERVERNAME=kibana
      - ELASTICSEARCH_HOSTS=https://es01:9200
      - ELASTICSEARCH_USERNAME=kibana_system
      - I18N_LOCALE=zh-CN
      - TZ=Asia/Shanghai
      - ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}
      - ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt
    mem_limit: ${MEM_LIMIT}
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120

volumes:
  certs:
    driver: local
  esdata01:
    driver: local
  esdata02:
    driver: local
  esdata03:
    driver: local
  kibanadata:
    driver: local

二、安装Elasticsearch-head插件

1、本机安装

# git clone https://github.com/mobz/elasticsearch-head.git
# yum -y install epel-release
# yum -y install nodejs npm
# cd elasticsearch-head/
# npm install
# cd _site/
# vim app.js 
# 原代码为this.base_uri = this.config.base_uri;
this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://192.168.1.10:9200";
# vim /etc/elasticsearch/elasticsearch.yml 
http.cors.enabled: true
http.cors.allow-origin: "*"
# cd elasticsearch-head/
# node_modules/grunt/bin/grunt server &

2、浏览器插件安装

下载es-head插件,https://github.com/mobz/elasticsearch-head

下载后,解压,复制crx目录下es-head.crx到桌面

改名es-head.crx为es-head.crx.zip

解压es-head.crx.zip到es-head.crx目录,把目录es-head.crx,上传到谷歌浏览器开发工具--扩展程序里

三、构建es+kibana+filebeat架构(小型)

1、安装kibana

# cd /data/soft
# rpm -ivh kibana-8.6.1-x86_64.rpm
# vim /etc/kibana/kibana.yml
server.port: 5601
server.host: "192.168.1.10"
server.name: "node-1"             								# 所在主机的主机名
elasticsearch.hosts: ["http://192.168.1.10:9200"]             	# es服务器的ip,便于接收日志数据
# systemctl start kibana

2、安装filebeat

# cd /data/soft
# rpm -ivh filebeat-8.6.1-x86_64.rpm
# vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log

output.elasticsearch:
  hosts: ["192.168.1.10:9200"]
# systemctl start filebeat

3、filebeat收集nginx日志

Filebeat的Nginx Module模块可直接用来处理Nginx标准格式的访问日志和错误日志。确认nginx日志为普通格式!

1、启用Nginx模块

/etc/filebeat/filebeat.yml中配置加载Modules

output.elasticsearch:
  hosts: ["192.168.51.189"]
  indices:
    - index: "nginx-access-%{[beat.version]}-%{+yyyy.MM}"
      when.contains:
        source: "/var/log/nginx/access.log"
    - index: "nginx-error-%{[beat.version]}-%{+yyyy.MM}"
      when.contains:
        source: "/var/log/nginx/error.log"
setup.template.name: "nginx"
setup.template.pattern: "nginx-*"
setup.template.enabled: false
setup.template.overwrite: true
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true
  reload.period: 10s

指定特殊的filebeat全局配置文件来配置加载Modules

filebeat -c /etc/filebeat/filebeat.yaml modules enable nginx

启用Nginx 模块

filebeat modules enable nginx
filebeat modules list

所有elasticsearch也需要安装插件,否则会报错

# elasticsearch-plugin install ingest-geoip
# elasticsearch-plugin install ingest-user-agent
# systemctl restart elasticsearch

2、配置Nginx模块变量参数

1. 配置Nginx 模块变量参数的方式
  • Nginx module的配置文件/etc/filebeat/modules.d/nginx.yml

    - module: nginx
      # 设置Nginx访问日志
      access:
        input:
      	  close_eof: true
        enabled: true
        # 日志文件路径。如果为空,默认根据操作系统版本自行选择日志文件路径.
        var.paths: [ "/var/log/nginx/access.log", "/var/log/nginx/admin_access.log" ]
      # 设置Nginx错误日志
      error:
        enabled: true
        # 日志文件路径。如果为空,默认根据操作系统版本自行选择日志文件路径.
        var.paths: ["/var/log/nginx/error.log"]
    # ./filebeat setup
    
  • 在运行时

    filebeat -e \
    --modules nginx \
    -M "nginx.access.var.paths=[ "/var/log/nginx/access.log", "/var/log/nginx/admin_access.log" ]" \
    -M "nginx.error.var.paths=["/var/log/nginx/error.log"]" \
    -M "nginx.access.input.close_eof=true"
    

3、Nginx模块配置文件详解

1、Nginx模块配置目录结构

Nginx模块中所有的配置文件在/usr/share/filebeat/module/nginx路径下

# 文件beat的Nginx模块根目录,包含访问日志(access)和错误日志(error)的配置,以及模块的元数据(module.yml)
/usr/share/filebeat/module/nginx

# 访问日志相关的配置目录
├── access
   # 访问日志的配置文件,定义了filebeat如何解析和处理Nginx访问日志
   │   ├── config
   │   │   └── nginx-access.yml
   # 默认的Ingest Node管道配置,用于在Elasticsearch中进一步处理和丰富从Nginx访问日志收集的数据
   │   ├── ingest
   │   │   └── default.json
   # manifest.yml文件用于描述模块的版本和依赖信息
   │   └── manifest.yml

# 错误日志相关的配置目录,结构与访问日志相似
├── error
   │   ├── config
   │   │   └── nginx-error.yml  # Nginx错误日志的配置文件
   │   ├── ingest
   │   │   └── pipeline.json  # Elasticsearch中的Ingest Pipeline配置,专门处理Nginx错误日志
   │   └── manifest.yml  # 错误日志模块的manifest文件

# 模块的元数据文件,定义了模块的基本信息,如名称、描述等
└── module.yml
2、module.yml
# module.yml 文件包含了Filebeat Nginx模块相关的Kibana Dashboard配置信息。
# 每个dashboards部分定义了一个与Nginx日志分析相关的Kibana仪表板(Dashboard)。

dashboards:
  # 第一个Dashboard配置
  - id: 55a9e6e0-a29e-11e7-928f-5dbe6f6f5519  # Dashboard唯一标识符
    file: Filebeat-nginx-overview.json  # 对应的Dashboard配置文件名,用于展示Nginx概览信息

  # 第二个Dashboard配置
  - id: 046212a0-a2a1-11e7-928f-5dbe6f6f5519
    file: Filebeat-nginx-logs.json  # Nginx日志详细视图的Dashboard配置文件

  # 第三个Dashboard配置,关联机器学习(Machine Learning)分析
  - id: ML-Nginx-Access-Remote-IP-Count-Explorer
    file: ml-nginx-access-remote-ip-count-explorer.json  # 用于探索远程IP访问计数的ML Dashboard配置

  # 第四个Dashboard配置,同样涉及机器学习分析
  - id: ML-Nginx-Remote-IP-URL-Explorer
    file: ml-nginx-remote-ip-url-explorer.json  # 用于分析远程IP与URL关系的ML Dashboard配置
3、access/manifest.yml
# module_version定义了该Nginx模块的版本号
module_version: "1.0"

# var部分定义了变量,这里定义了日志文件路径的默认值及根据操作系统调整的路径
var:
  - name: paths
    # 默认的日志文件路径,适用于大多数Linux系统
    default:
      - /var/log/nginx/access.log*
    # 如果运行在macOS系统上,则使用如下路径
    os.darwin:
      - /usr/local/var/log/nginx/access.log*
    # Windows系统的日志文件路径
    os.windows:
      - c:/programdata/nginx/logs/*access.log*

# ingest_pipeline指定了默认的索引管道配置文件,用于数据预处理
ingest_pipeline: ingest/default.json

# input配置指定了访问日志的解析配置文件
input: config/nginx-access.yml

# machine_learning部分定义了一系列机器学习作业及其数据馈送(datafeed),用于高级分析
machine_learning:
  - name: response_code         # 分析响应代码的机器学习作业
    job: machine_learning/response_code.json     # 作业配置文件
    datafeed: machine_learning/datafeed_response_code.json  # 数据馈送配置文件
    min_version: 5.5.0        # 最低支持的Elastic Stack版本
  - name: low_request_rate      # 低请求率检测的机器学习作业
    job: machine_learning/low_request_rate.json
    datafeed: machine_learning/datafeed_low_request_rate.json
    min_version: 5.5.0
  - name: remote_ip_url_count   # 远程IP与URL访问计数分析
    job: machine_learning/remote_ip_url_count.json
    datafeed: machine_learning/datafeed_remote_ip_url_count.json
    min_version: 5.5.0
  - name: remote_ip_request_rate # 远程IP请求率分析
    job: machine_learning/remote_ip_request_rate.json
    datafeed: machine_learning/datafeed_remote_ip_request_rate.json
    min_version: 5.5.0
  - name: visitor_rate          # 访客率分析
    job: machine_learning/visitor_rate.json
    datafeed: machine_learning/datafeed_visitor_rate.json
    min_version: 5.5.0

# requires.processors部分列出了该模块需要的处理器插件
requires.processors:
  - name: user_agent       # 用户代理处理器,用于解析用户代理字符串
    plugin: ingest-user-agent
  - name: geoip            # 地理位置处理器,基于IP地址提供地理位置信息
    plugin: ingest-geoip
4、access/config/nginx-access.yml
# 这段配置是Filebeat输入配置的一部分,特别针对日志类型的输入源进行了设置。

type: log                 # 指定输入数据的类型为日志(log)

# paths配置指定了要收集的日志文件路径列表。这里使用Go模板语法动态生成路径列表,
# 根据之前定义的'var.paths'变量内容,遍历并插入每个路径项。这允许配置根据不同环境自动调整。
paths:
{{ range $i, $path := .paths }}
 - {{$path}}             # 动态输出每个日志文件路径
{{ end }}

# exclude_files定义了需要排除的文件模式,例如不处理.gz压缩的日志文件。
exclude_files: [".gz$"]   # 排除所有以.gz结尾的文件,通常这些是已压缩的日志文件

# processors部分定义了处理管道,用于在数据被发送前对日志事件进行预处理。
processors:
- add_locale: ~           # 添加一个处理器,用于在事件中注入系统的区域设置信息(如语言和国家代码)
                         # '~'符号表示使用默认配置,即不提供额外配置参数
5、access/ingest/default.json
{
    "description": "Pipeline for parsing Nginx access logs. Requires the geoip and user_agent plugins.",
    "processors": [
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "\"?(?:%{IP_LIST:nginx.access.remote_ip_list}|%{DATA:source.address}) - %{DATA:user.name} \\[%{HTTPDATE:nginx.access.time}\\] \"%{DATA:nginx.access.info}\" %{NUMBER:http.response.status_code:long} %{NUMBER:http.response.body.bytes:long} \"%{DATA:http.request.referrer}\" \"%{DATA:user_agent.original}\""
                ],
                "pattern_definitions": {
                    "IP_LIST": "%{IP}(\"?,?\\s*%{IP})*"
                },
                "ignore_missing": true
            }
        },
        {
            "grok": {
                "field": "nginx.access.info",
                "patterns": [
                    "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}",
                    ""
                ],
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "nginx.access.info"
            }
        },
        {
            "split": {
                "field": "nginx.access.remote_ip_list",
                "separator": "\"?,?\\s+",
                "ignore_missing": true
            }
        },
        {
            "split": {
                "field": "nginx.access.origin",
                "separator": "\"?,?\\s+",
                "ignore_missing": true
            }
        },
        {
            "set": {
                "field": "source.ip",
                "value": ""
            }
        },
        {
            "script": {
                "lang": "painless",
                "source": "boolean isPrivate(def dot, def ip) { try { StringTokenizer tok = new StringTokenizer(ip, dot); int firstByte = Integer.parseInt(tok.nextToken());       int secondByte = Integer.parseInt(tok.nextToken());       if (firstByte == 10) {         return true;       }       if (firstByte == 192 && secondByte == 168) {         return true;       }       if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {         return true;       }       if (firstByte == 127) {         return true;       }       return false;     } catch (Exception e) {       return false;     }   }   try {    ctx.source.ip = null;    if (ctx.nginx.access.remote_ip_list == null) { return; }    def found = false;    for (def item : ctx.nginx.access.remote_ip_list) {        if (!isPrivate(params.dot, item)) {            ctx.source.ip = item;            found = true;            break;        }    }    if (!found) {     ctx.source.ip = ctx.nginx.access.remote_ip_list[0];   }} catch (Exception e) { ctx.source.ip = null; }",
                "params": {
                    "dot": "."
                }
            }
        },
        {
            "remove": {
                "field": "source.ip",
                "if": "ctx.source.ip == null"
            }
        },
        {
            "convert": {
                "field": "source.ip",
                "target_field": "source.address",
                "type": "string",
                "ignore_missing": true
            }
        },
        {
            "remove": {
                "field": "message"
            }
        },
        {
            "rename": {
                "field": "@timestamp",
                "target_field": "event.created"
            }
        },
        {
            "date": {
                "field": "nginx.access.time",
                "target_field": "@timestamp",
                "formats": [
                    "dd/MMM/yyyy:H:m:s Z"
                ],
                "on_failure": [{"append": {"field": "error.message", "value": "{{ _ingest.on_failure_message }}"}}]
            }
        },
        {
            "remove": {
                "field": "nginx.access.time"
            }
        },
        {
            "user_agent": {
                "field": "user_agent.original"
            }
        },
        {
            "geoip": {
                "field": "source.ip",
                "target_field": "source.geo",
                "ignore_missing": true
            }
        },
        {
            "geoip": {
                "database_file": "GeoLite2-ASN.mmdb",
                "field": "source.ip",
                "target_field": "source.as",
                "properties": [
                    "asn",
                    "organization_name"
                ],
                "ignore_missing": true
            }
        },
        {
            "rename": {
                "field": "source.as.asn",
                "target_field": "source.as.number",
                "ignore_missing": true
            }
        },
        {
            "rename": {
                "field": "source.as.organization_name",
                "target_field": "source.as.organization.name",
                "ignore_missing": true
            }
        }
    ],
    "on_failure": [
        {
            "set": {
                "field": "error.message",
                "value": "{{ _ingest.on_failure_message }}"
            }
        }
    ]
}
  1. Grok Processor: 首先应用Grok处理器来解析原始日志消息(message字段)。
    • 定义了复杂的模式来匹配Nginx日志格式,包括远程IP地址(可能包含多个)、用户名、时间戳、请求信息、HTTP响应状态码、响应体字节数、引用页面和用户代理信息。
    • 使用自定义的IP_LIST模式来匹配多个IP地址,以适应代理或负载均衡场景。
    • 如果某些字段缺失,配置会忽略它们而不会中断处理流程。
  2. 二次Grok Processor: 对于nginx.access.info字段进一步细分,提取HTTP方法、原始URL和HTTP版本。
    • 解析出请求方法、URL路径和HTTP协议版本。
    • 若信息缺失,则同样安全地忽略。
  3. Remove Processor: 在提取完所需信息后,移除不再需要的nginx.access.info字段。
  4. Split Processors: 分别对nginx.access.remote_ip_listnginx.access.origin(此字段在配置中未直接定义,可能是笔误或被遗漏的上下文)进行分割,如果存在多IP情况,将其分离为独立的条目。
  5. Set Processor: 初始化source.ip字段为空值,为后续脚本逻辑做准备。
  6. Script Processor: 使用Painless脚本来识别并选择非私有IP地址作为source.ip。这个脚本遍历IP列表,优先选取公网IP,避免使用私有IP地址范围(如10/8, 172.16/12, 192.168/16, 127/8)。
  7. Conditional Remove Processor: 如果source.ip仍为null(即没有找到合适的公网IP),则移除该字段。
  8. Convert Processor: 将选中的IP地址转换为字符串类型并存储到source.address字段。
  9. Remove Processor: 移除原始日志消息字段message,因为其内容已被分解并分配到其他字段。
  10. Rename Processor: 重命名时间戳字段,从默认的@timestamp更改为event.created,以便保留原始日志记录的时间信息。
  11. Date Processor: 将时间戳字段nginx.access.time转换为Elasticsearch可识别的日期格式,并更新到@timestamp字段。如果转换失败,会在error.message字段记录错误信息。
  12. Remove Processor: 转换完成后移除不再需要的原始时间戳字段。
  13. User Agent Processor: 分析user_agent.original字段,提取浏览器类型、版本等用户代理信息。
  14. GeoIP Processors: 使用GeoIP数据库来补充IP地址的地理信息。首先添加国家、城市等地理位置信息到source.geo字段,然后利用ASN数据库添加自治系统编号(ASN)和组织名到source.as相关的字段。
  15. Rename Processors: 重命名ASN相关的字段,使其结构更加清晰易读。
  16. On Failure Actions: 如果在处理过程中出现任何错误,设置error.message字段记录失败原因,便于后续问题排查。
6、error/manifest.yml
# 模块版本定义
module_version: "1.0"

# 变量定义区块,用于配置不同环境下错误日志的路径
var:
  - name: paths    # 变量名,代表错误日志的路径列表
    # 默认路径配置,适用于大多数Unix/Linux系统
    default:
      - /var/log/nginx/error.log*    # 匹配所有以error.log开头的文件,包括数字后缀的日志轮转文件
    # 特定于macOS系统的路径配置
    os.darwin:
      - /usr/local/var/log/nginx/error.log*    # Darwin(macOS)下的错误日志路径
    # 特定于Windows系统的路径配置
    os.windows:
      - c:/programdata/nginx/logs/error.log*  # Windows系统上的错误日志路径

# 索引管道配置文件路径,用于定义数据预处理的规则和转换
ingest_pipeline: ingest/pipeline.json

# 输入配置文件路径,定义如何从错误日志中读取数据
input: config/nginx-error.yml
7、error/config/nginx-error.yml
# 类型定义:指定日志类型的数据输入
type: log

# 日志路径配置:动态包含所有定义好的日志文件路径
# 利用Go的模板语法遍历`.paths`变量中定义的所有路径,并插入到配置中
paths:
{{ range $i, $path := .paths }}
 - {{$path}}
{{ end }}

# 排除文件模式:定义不应被处理的文件类型,这里排除所有gzip压缩的日志文件
exclude_files: [".gz$"]

# 处理器链:定义数据摄入前的预处理步骤
processors:
- add_locale: ~    # 添加一个处理器来注入本地化信息(如时区、地区设置)到事件中
                  # `~` 表示使用默认配置,不需要额外参数
8、error/ingest/pipeline.json
{
  "description": "Pipeline for parsing the Nginx error logs", // 描述:用于解析Nginx错误日志的管道配置
  "processors": [ // 处理器数组,定义了一系列对文档进行转换的步骤
    { // 第一步:使用Grok处理器匹配并解析日志中的字段
      "grok": {
        "field": "message", // 待解析的字段,即原始日志信息
        "patterns": [ // 定义的Grok模式,用于匹配日志格式
          "%{DATA:nginx.error.time} \\[%{DATA:log.level}\\] %{NUMBER:process.pid:long}#%{NUMBER:process.thread.id:long}: (\\*%{NUMBER:nginx.error.connection_id:long} )?%{GREEDYDATA:message}"
          // 匹配时间戳、日志级别、进程PID、线程ID、可选的连接ID以及剩余的日志消息体
        ],
        "ignore_missing": true // 如果字段不存在,则忽略而不报错
      }
    },
    { // 第二步:重命名时间戳字段,为兼容性保留原始时间戳信息
      "rename": {
        "field": "@timestamp",
        "target_field": "event.created"
      }
    },
    { // 第三步:尝试根据标准格式将提取的nginx.error.time转换为时间戳
      "date": {
        "field": "nginx.error.time",
        "target_field": "@timestamp",
        "formats": ["yyyy/MM/dd H:m:s"],
        "ignore_failure": true // 解析失败时忽略错误,继续处理
      }
    },
    { // 第四步(可选):若存在时区信息,则根据时区重新解析时间戳
      "date": {
        "if": "ctx.event.timezone != null", // 条件执行,只有当event.timezone存在时
        "field": "nginx.error.time",
        "target_field": "@timestamp",
        "formats": ["yyyy/MM/dd H:m:s"],
        "timezone": "{{ event.timezone }}", // 使用指定的时区进行转换
        "on_failure": [{ // 失败处理策略
          "append": {
            "field": "error.message",
            "value": "{{ _ingest.on_failure_message }}" // 记录失败原因至error.message字段
          }
        }]
      }
    },
    { // 第五步:完成时间戳转换后,移除原始时间戳字段
      "remove": {
        "field": "nginx.error.time"
      }
    }]
  },
  "on_failure" : [{ // 整体失败处理策略
    "set" : {
      "field" : "error.message",
      "value" : "{{ _ingest.on_failure_message }}" // 记录整体处理过程中的任何失败信息
    }
  }]
}

4、示例

1. Filebeat Nginx模块配置采集API网关Kong日志

Kong日志数据采集处理流程kong节点 + filbeat ----> Kubernetes上的Logstash ----> Kubernetes上的Elasticsearch

Kong使用了Nginx作为基础组件,它的日志也主要是Nginx格式的日志,分为两种:访问日志和错误日志。它的Nginx是安装了Lua模块的,而Lua模块的错误日志和Nginx的错误日志混合在一起。Lua的错误日志格式有的是多行。这就造成整个Nginx错误日志中既有单行错误日志,又有多行错误日志。

直接使用Filebeat的Nginx模块采集日志文件。对于标准格式的Kong访问日志是没有问题的,关键点是错误日志,要修改filebeat的Nginx模块对错误日志文件进行多行采集,设置过滤关键词,将关键词之间的多行合并为一个采集事件。

Kong的日志输出目录:/usr/local/kong/logs。目录下有两种格式的日志文件

1、Nginx标准日志格式的访问日志文件:/usr/local/kong/logs/admin_access.log /usr/local/kong/logs/access.log

172.17.18.169 - - [21/Oct/2019:11:47:42 +0800] "GET /oalogin.php HTTP/1.1" 494 46 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"

2、带有Lua模块Nginx的错误日志文件 :/usr/local/kong/logs/error.log

2019/10/21 10:58:56 [warn] 14716#0: *17345670 [lua] reports.lua:70: log(): [reports] unknown request scheme: http while logging request, client: 172.17.18.169, server: kong, request: "GET / HTTP/1.1", host: "172.17.18.169"
2019/10/21 10:59:05 [warn] 14717#0: *17346563 [lua] reports.lua:70: log(): [reports] unknown request scheme: http while logging request, client: 172.17.18.169, server: kong, request: "GET /routes HTTP/1.1", host: "172.17.18.169"
2019/10/21 11:00:09 [error] 14716#0: *17348732 lua coroutine: runtime error: don't know how to respond to POST
stack traceback:
coroutine 0:
        [C]: ?
coroutine 1:
        [C]: in function 'resume'
        /usr/local/share/lua/5.1/lapis/application.lua:397: in function 'handler'
        /usr/local/share/lua/5.1/lapis/application.lua:130: in function 'resolve'
        /usr/local/share/lua/5.1/lapis/application.lua:167: in function </usr/local/share/lua/5.1/lapis/application.lua:165>
        [C]: in function 'xpcall'
        /usr/local/share/lua/5.1/lapis/application.lua:173: in function 'dispatch'
        /usr/local/share/lua/5.1/lapis/nginx.lua:230: in function 'serve'
        /usr/local/share/lua/5.1/kong/init.lua:1113: in function 'admin_content'
        content_by_lua(nginx-kong.conf:190):2: in main chunk, client: 172.17.18.169, server: kong_admin, request: "POST /routes/smsp-route HTTP/1.0", host: "local.api.kong.curouser.com:80"
2019/10/21 11:06:38 [warn] 14713#0: *17362982 [lua] reports.lua:70: log(): [reports] unknown request scheme: http while logging request, client: 172.17.18.169, server: kong, request: "GET /upstream HTTP/1.1", host: "172.17.18.169"

修改Nginx模块采集错误日志文件的方式

Filebeat的安装,Nignx模块启用,模块参数配置等操作步骤省略。这里只写针对Nginx错误日志配置进行的修改。

编辑/usr/share/filebeat/module/nginx/error/config/nginx-error.yml

# ===========================修改前的======================================
type: log
paths:
{{ range $i, $path := .paths }}
 - {{$path}}
{{ end }}
exclude_files: [".gz$"]

processors:
- add_locale: ~
# ===========================修改后的======================================
type: log
paths:
{{ range $i, $path := .paths }}
 - {{$path}}
{{ end }}
exclude_files: [".gz$"]
multiline.pattern: '^[0-9]{4}/[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}'
multiline.negate: true
multiline.match: after  

Logstash针对FIlebeat发送过来的日志事件进行分割处理的Pipelines

#=======================接收Filebeat发送过来的日志事件====================
input {
  beats {
    id => "logstash_kong_beats"
    port => 5044
  }
}
#=======================过滤、拆分、转换日志事件==============================
filter {
  if [fileset][name] == "access" {
    grok {
       match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
       remove_field => "message"
     }
     mutate {
      add_field => { "read_timestamp" => "%{@timestamp}" }
     }
     date {
             match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
             remove_field => "[nginx][access][time]"
     }
     useragent {
             source => "[nginx][access][agent]"
             target => "[nginx][access][user_agent]"
             remove_field => "[nginx][access][agent]"
     }
     geoip {
             source => "[nginx][access][remote_ip]"
             target => "[nginx][access][geoip]"
           }
     }
   else if [fileset][name] == "error" {
     grok {
       match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
       remove_field => "message"
     }
       mutate {
         rename => { "@timestamp" => "read_timestamp" }
       }
       date {
         match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
         remove_field => "[nginx][error][time]"
       }
     }
}
#=======================根据日志事件类型的不同输出到不同elasticsearch索引中====================
output {
  if [fileset][name] == "access" {
    elasticsearch {
      id => "logstash_kong_access_log"
      hosts => ["elasticsearch.elk.svc"]
      index => "kong-accesslog-%{+YYYY.MM.dd}"
      document_type => "_doc"
      http_compression => true
      template_name => "logstash-logger"
      user => "logstash-user"
      password => "logstash-password"
    }
  }else if [fileset][name] == "error"{
    elasticsearch {
      id => "logstash_kong_error_log"
      hosts => ["elasticsearch.elk.svc"]
      index => "kong-errorlog-%{+YYYY.MM.dd}"
      document_type => "_doc"
      http_compression => true
      template_name => "logstash-curiouser"
      user => "logstash-user"
      password => "logstash-password"
     }
  }
}

4、收集tomcat日志

# vim /etc/tomcat/server.xml
		<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
               prefix="localhost_access_log." suffix=".txt"
			   pattern="{&quot;clientip&quot;:&quot;%h&quot;,&quot;ClientUser&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;AccessTime&quot;:&quot;%t&quot;,&quot;method&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;SendBytes&quot;:&quot;%b&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;partner&quot;:&quot;%{Referer}i&quot;,&quot;AgentVersion&quot;:&quot;%{User-Agent}i&quot;}"/>
# vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/tomcat/localhost_access_log.*.txt
  json.keys_under_root: true
  json.overwrite_keys: true
  tags: ["tomcat"]    
  
output.elasticsearch:
  hosts: ["192.168.1.10:9200"]
  index: "tomcat_access-%{[beat.version]}-%{+yyyy.MM}"

setup.template.name: "tomcat"
setup.template.pattern: "tomcat_*"
setup.template.enabled: false
setup.template.overwrite: true
# systemctl restart filebeat

5、收集java多行匹配模式

# vim /etc/tomcat/server.xml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/elasticsearch/elasticsearch.log
  multiline.pattern: '^\['
  multiline.negate: true
  multiline.match: after

output.elasticsearch:
  hosts: ["192.168.1.10:9200"]
  index: "es-%{[beat.version]}-%{+yyyy.MM}"
setup.template.name: "es"
setup.template.pattern: "es-*"
setup.template.enabled: false
setup.template.overwrite: true
# systemctl restart filebeat

6、收集docker日志

# mkdir /opt/{nginx,mysql}       						# 日志挂载目录(仅参考)
# vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log 
  enabled: true
  paths:
    - /var/lib/docker/containers/*/*-json.log
  json.keys_under_root: true
  json.overwrite_keys: true

output.elasticsearch:
  hosts: ["192.168.1.10:9200"]
  indices:
    - index: "docker-nginx-access-%{[beat.version]}-%{+yyyy.MM}"
      when.contains:
        stream: "stdout"
        attrs.service: "nginx"
    - index: "docker-nginx-error-%{[beat.version]}-%{+yyyy.MM}"
      when.contains:
        stream: "stderr"
        attrs.service: "nginx"
    - index: "docker-db-access-%{[beat.version]}-%{+yyyy.MM}"
      when.contains:
        stream: "stdout"
        attrs.service: "db"
    - index: "docker-db-error-%{[beat.version]}-%{+yyyy.MM}"
      when.contains:
        stream: "stderr"
        attrs.service: "db"

setup.template.name: "docker"
setup.template.pattern: "docker-*"
setup.template.enabled: false
setup.template.overwrite: true
# systemctl restart filebeat

7、模块收集mysql慢日志

# vim /etc/my.cnf
[mysqld]
user=mysql
basedir=/usr/local/mysql
datadir=/data/mysql/data
socket=/tmp/mysql.sock
server_id=6
log_bin=/data/binlog/mysql-bin
binlog_format=row
port=3306
log_error=/tmp/mysql3306.log
gtid-mode=on
enforce-gtid-consistency=true
#开启慢日志
slow_query_log=1 
#文件位置及名字(提前创建路径及赋权 )
slow_query_log_file=/data/mysql/slow.log
#设定慢查询时间
long_query_time=0.1
#没走索引的语句也记录
log_queries_not_using_indexes
[mysql]
socket=/tmp/mysql.sock
# systemctl restart mysqld
# systemctl restart mysql 
# filebeat module enable mysql
# vim /etc/filebeat/modules.d/mysql.yml
module: mysql
error:
  enabled: true
  var.paths: ["/data/mysql/slow.log"]

slowlog:
  enabled: true 
  var.paths: ["/data/mysql/slow.log"]
# vim /etc/filebeat/filebeat.yml
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true
  reload.period: 10s

output.elasticsearch:
  hosts: ["192.168.1.10:9200"]
  indices:
    - index: "mysql_slowlog-%{[beat.version]}-%{+yyyy.MM}"
      when.contains:
        fileset.module: "mysql"
        fileset.name: "slowlog"
    - index: "mysql_error-%{[beat.version]}-%{+yyyy.MM}"
      when.contains:
        fileset.module: "mysql"
        fileset.name: "error"

setup.template.name: "mysql"
setup.template.pattern: "mysql_*"
setup.template.enabled: false
setup.template.overwrite: true
# systemctl restart filebeat

8、实现日志的多行合并功能

系统应用中的日志一般都是以特定格式进行打印的,属于同一条日志的数据可能分多行进行打印,那么在使用ELK收集日志的时候就需要将属于同一条日志的多行数据进行合并。

解决方案:使用 Filebeat 或 Logstash 中的 multiline 多行合并插件来实现。

在使用 multiline 多行合并插件的时候需要注意,不同的 ELK 部署架构可能 multiline 的使用方式也不同,如果是Logstash作为日志收集器,那么 multiline 需要在 Logstash 中配置使用,如果是Filebeat作为日志收集器,那么 multiline 需要在 Filebeat 中配置使用,无需再在Logstash 中配置 multiline。

1、multiline 在 Filebeat 中的配置方式

filebeat.prospectors:
    -
       paths:
          - /home/project/elk/logs/test.log
       input_type: log 
       multiline:
            pattern: '^\['
            negate: true
            match: after
output:
   logstash:
      hosts: ["localhost:5044"]
  • pattern:正则表达式;
  • negate:默认为false,表示匹配pattern的行合并到上一行;true表示不匹配pattern的行合并到上一行;
  • match:after表示合并到上一行的末尾,before表示合并到上一行的行首。

如:

pattern: ‘[‘
negate: true
match: after

该配置表示将不匹配pattern模式的行合并到上一行的末尾

2、multiline在Logstash中的配置方式

input {
  beats {
    port => 5044
  }
}

filter {
  multiline {
    pattern => "%{LOGLEVEL}\s*\]"
    negate => true
    what => "previous"
  }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
  }
}

(1)Logstash 中配置的 what 属性值为 previous,相当于 Filebeat 中的 after,Logstash 中配置的 what 属性值为 next,相当于 Filebeat 中的 before。

(2)pattern => “%{LOGLEVEL}\s*]“ 中的LOGLEVEL是Logstash预制的正则匹配模式,预制的还有好多常用的正则匹配模式,详细请看:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

四、构建filebeat+redis+logstash+es+kibana架构(大型)

1、安装redis

# mkdir -p /opt/redis_cluster/redis_6379/{conf,logs,pid}
# cd /data/soft/
# wget http://download.redis.io/releases/redis-5.0.7.tar.gz
# tar xf redis-5.0.7.tar.gz -C /opt/redis_cluster/
# ln -s /opt/redis_cluster/redis-5.0.7  /opt/redis_cluster/redis
# cd /opt/redis_cluster/redis
# make && make install 
# vim /opt/redis_cluster/redis_6379/conf/6379.conf
bind 127.0.0.1 192.168.1.10
port 6379
daemonize yes
pidfile /opt/redis_cluster/redis_6379/pid/redis_6379.pid
logfile /opt/redis_cluster/redis_6379/logs/redis_6379.log
databases 16
dbfilename redis.rdb
dir /opt/redis_cluster/redis_6379
# redis-server /opt/redis_cluster/redis_6379/conf/6379.conf

2、修改filebeat配置文件,output给redis

# vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  json.keys_under_root: true
  json.overwrite_keys: true
  tags: ["access"]

- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  tags: ["error"]

setup.template.settings:
  index.number_of_shards: 3

setup.kibana:

output.redis:
  hosts: ["192.168.1.10"]
  key: "filebeat"
  db: 0
  timeout: 5
# systemctl restart filebeat
# ab -n 100 -c 20 http://192.168.1.10/             # 使用ab压力测试工具测试访问
# redis-cli              # 登录
127.0.0.1:6379> keys *             # 列出所有键
1) "filebeat"
127.0.0.1:6379> type filebeat              # filebeat为键值名
list
127.0.0.1:6379> LLEN filebeat             # 查看list长度
(integer) 100
127.0.0.1:6379> LRANGE filebeat 0 -1             # 查看list所有内容
  1) "{\"@timestamp\":\"2020-08-05T13:53:57.104Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"doc\",\"version\":\"6.6.0\"},\"up_resp_time\":\"-\",\"source\":\"/var/log/nginx/access.log\",\"error\":{\"message\":\"@timestamp not overwritten (parse error on 05/Aug/2020:21:53:49 +0800)\",\"type\":\"json\"},\"input\":{\"type\":\"log\"},\"host\":{\"name\":\"node-1\"},\"agent\":\"ApacheBench/2.3\",\"up_host\":\"-\",\"remote_addr\":\"192.168.1.10\",\"bytes\":4833,\"prospector\":{\"type\":\"log\"},\"log\":{\"file\":{\"path\":\"/var/log/nginx/access.log\"}},\"request\":\"GET / HTTP/1.0\",\"request_time\":\"0.000\",\"tags\":[\"access\"],\"beat\":{\"version\":\"6.6.0\",\"name\":\"node-1\",\"hostname\":\"node-1\"},\"up_addr\":\"-\",\"x_forwarded\":\"-\",\"offset\":82800,\"referer\":\"-\",\"status\":200}"
......

3、安装logstash,收集redis的日志,提交给es

# cd /data/soft/
# rpm -ivh logstash-6.6.0.rpm 
# vim /etc/logstash/conf.d/redis.conf             # 实现access和error日志分离
input {
  redis {
    host => "192.168.1.10"
    port => "6379"
    db => "0"
    key => "filebeat"
    data_type => "list"
  }
}

filter {
  mutate {
    convert => ["upstream_time","float"]
    convert => ["request_time","float"]
  }
}

output {
  stdout {}
   if "access" in [tags] {
    elasticsearch {
      hosts => ["http://192.168.1.10:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
      manage_template => false
    }
   }
   if "error" in [tags] {
    elasticsearch {
      hosts => ["http://192.168.1.10:9200"]
      index => "nginx_error-%{+YYYY.MM.dd}"
      manage_template => false
    }
   }
}
# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf             # 启动logstash

4、安装nginx+keepalived对redis实现负载均衡

# yum -y install nginx
# systemctl start nginx
# vim /etc/nginx/nginx.conf
stream {
  upstream redis {
     server 192.168.1.20:6379 max_fails=2 fail_timeout=10s;
     server 192.168.1.30:6379 max_fails=2 fail_timeout=10s;
     }

  server {
        listen 6379;
        proxy_connect_timeout 1s;
        proxy_timeout 3s;
        proxy_pass redis;
  }
}
# yum -y install keepalived
# vim /etc/keepalived/keepalived.conf 
global_defs {
   router_id lb1
}

vrrp_instance VI_1 {
    state MASTER
    interface ens33
    virtual_router_id 51
    priority 100
    advert_int 1
    authentication {
        auth_type PASS
        auth_pass 1111
    }
    virtual_ipaddress {
        192.168.1.254
    }
}
# systemctl restart keepalived

五、kibana使用

访问:http://IP:5601

0

评论区