找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2390

积分

0

好友

337

主题
发表于 昨天 10:37 | 查看: 4| 回复: 0

一、概述

1.1 背景介绍

Nginx 日志堪称运维人员的金矿,每条访问记录都蕴含着宝贵信息:用户来源、访问页面、响应时长以及错误情况。然而,若缺乏有效管理,这座金矿可能迅速演变为灾难。例如,日志文件膨胀至 100GB 撑爆磁盘,或因未切割日志导致历史记录丢失,使故障无法追溯。

2022 年某项目中,客户反馈系统间歇性变慢,持续两周却找不到原因。后来部署 ELK 分析 Nginx 日志,发现每天下午 3 点左右有爬虫疯狂抓取,每秒数百请求,直接将后端打垮。此类问题靠人工查看日志难以发现,但在 Kibana 中却一目了然。

管理日志看似简单:切割、存储、分析。但要真正做到位,需考虑诸多细节:切割策略如何制定、日志格式如何设计、ELK 如何部署、告警如何配置。本文将逐一详解这些问题。

1.2 技术特点

日志切割方案对比

方案 优点 缺点 适用场景
logrotate 系统自带、稳定可靠 功能单一 中小规模、传统部署
Nginx 自带 配置简单 需要信号控制 简单场景
cronolog 按时间自动切割 额外进程开销 需要实时切割
Filebeat 功能强大、与 ELK 集成 资源占用 大规模、需要分析

ELK Stack 架构

Nginx -> Filebeat -> Logstash -> Elasticsearch -> Kibana
   日志        采集        解析          存储          展示
  • Filebeat:轻量级日志采集器,部署在 Nginx 服务器上
  • Logstash:日志解析和转换,支持丰富的过滤器
  • Elasticsearch:分布式搜索引擎,存储和索引日志
  • Kibana:可视化平台,用于图表和仪表盘

1.3 适用场景

日志切割适用于

  • 所有生产环境的 Nginx 服务器
  • 需要保留历史日志的场景
  • 磁盘空间有限的环境

ELK 分析适用于

  • 多台 Nginx 服务器集中管理
  • 需要实时监控和告警
  • 安全审计和合规要求
  • 性能分析和故障排查
  • 用户行为分析

1.4 环境要求

组件 版本要求 最低配置 推荐配置
操作系统 Rocky Linux 9 / Ubuntu 24.04 - -
Nginx 1.26.x / 1.27.x - -
Elasticsearch 8.x (8.12+) 4GB 内存 16GB 内存
Logstash 8.x 2GB 内存 4GB 内存
Kibana 8.x 1GB 内存 2GB 内存
Filebeat 8.x 256MB 内存 512MB 内存
Java OpenJDK 17+ - -
磁盘 SSD 推荐 50GB 500GB+

二、详细步骤

2.1 准备工作

优化 Nginx 日志格式

默认的 combined 格式信息有限,生产环境建议使用自定义格式:

# /etc/nginx/nginx.conf

http {
    # JSON 格式日志(推荐,便于 ELK 解析)
    log_format json_combined escape=json
        '{'
            '"time_local":"$time_local",'
            '"time_iso8601":"$time_iso8601",'
            '"remote_addr":"$remote_addr",'
            '"remote_user":"$remote_user",'
            '"request_method":"$request_method",'
            '"request_uri":"$request_uri",'
            '"uri":"$uri",'
            '"args":"$args",'
            '"server_protocol":"$server_protocol",'
            '"status":$status,'
            '"body_bytes_sent":$body_bytes_sent,'
            '"request_time":$request_time,'
            '"upstream_response_time":"$upstream_response_time",'
            '"upstream_connect_time":"$upstream_connect_time",'
            '"upstream_header_time":"$upstream_header_time",'
            '"upstream_addr":"$upstream_addr",'
            '"http_referer":"$http_referer",'
            '"http_user_agent":"$http_user_agent",'
            '"http_x_forwarded_for":"$http_x_forwarded_for",'
            '"http_host":"$http_host",'
            '"server_name":"$server_name",'
            '"request_length":$request_length,'
            '"ssl_protocol":"$ssl_protocol",'
            '"ssl_cipher":"$ssl_cipher",'
            '"gzip_ratio":"$gzip_ratio"'
        '}';

    # 传统格式日志(兼容老工具)
    log_format detailed '$remote_addr - $remote_user [$time_local] '
                        '"$request" $status $body_bytes_sent '
                        '"$http_referer" "$http_user_agent" '
                        'rt=$request_time uct="$upstream_connect_time" '
                        'uht="$upstream_header_time" urt="$upstream_response_time" '
                        'ua="$upstream_addr" us="$upstream_status" '
                        'cs=$upstream_cache_status';

    # 使用 JSON 格式
    access_log /var/log/nginx/access.log json_combined;
    error_log /var/log/nginx/error.log warn;
}

日志字段说明

字段 说明 示例
$time_iso8601 ISO8601 时间格式 2025-01-07T10:30:00+08:00
$remote_addr 客户端 IP 192.168.1.100
$request_method 请求方法 GET/POST
$request_uri 完整请求 URI(含参数) /api/user?id=1
$status HTTP 状态码 200/404/500
$body_bytes_sent 响应体大小 1234
$request_time 请求处理时间(秒) 0.052
$upstream_response_time 后端响应时间 0.048
$http_user_agent 用户代理 Mozilla/5.0...
$http_x_forwarded_for 真实客户端 IP(经过代理) 10.0.0.1

创建日志目录

# 创建日志目录
mkdir -p /var/log/nginx/{access,error,archive}
mkdir -p /data/logs/nginx

# 设置权限
chown -R nginx:nginx /var/log/nginx
chmod -R 755 /var/log/nginx

# 按站点分离日志(可选)
mkdir -p /var/log/nginx/sites/{www,api,admin}

2.2 核心配置

方案一:logrotate 日志切割(推荐)

# /etc/logrotate.d/nginx
/var/log/nginx/*.log
/var/log/nginx/*/*.log {
    # 每天切割
    daily

    # 保留 90 天
    rotate 90

    # 压缩旧日志
    compress

    # 延迟压缩(保留最近一个不压缩,便于实时分析)
    delaycompress

    # 日志文件不存在不报错
    missingok

    # 空日志不切割
    notifempty

    # 使用日期作为后缀
    dateext
    dateformat -%Y%m%d

    # 创建新日志文件的权限
    create 0644 nginx nginx

    # 切割后执行的脚本
    sharedscripts
    postrotate
        # 发送 USR1 信号让 Nginx 重新打开日志文件
        if [ -f /run/nginx.pid ]; then
            kill -USR1 $(cat /run/nginx.pid)
        fi
    endscript
}
# 测试配置
logrotate -d /etc/logrotate.d/nginx

# 手动执行切割
logrotate -f /etc/logrotate.d/nginx

# 查看切割结果
ls -la /var/log/nginx/
# access.log
# access.log-20250106.gz
# access.log-20250105.gz

方案二:按小时切割(高流量场景)

# /etc/logrotate.d/nginx-hourly
/var/log/nginx/*.log {
    hourly
    rotate 168  # 保留 7 天(7*24=168 小时)
    compress
    delaycompress
    missingok
    notifempty
    dateext
    dateformat -%Y%m%d%H
    create 0644 nginx nginx
    sharedscripts
    postrotate
        if [ -f /run/nginx.pid ]; then
            kill -USR1 $(cat /run/nginx.pid)
        fi
    endscript
}
# 添加 hourly 定时任务
# 创建 hourly 配置
cat > /etc/cron.hourly/nginx-logrotate << 'EOF'
#!/bin/bash
/usr/sbin/logrotate /etc/logrotate.d/nginx-hourly
EOF

chmod +x /etc/cron.hourly/nginx-logrotate

方案三:Nginx 变量动态日志路径

# 按日期自动切换日志文件
# 需要借助 map 或 if 实现

http {
    # 使用 map 提取日期
    map $time_iso8601 $logdate {
        ~^(?<ymd>\d{4}-\d{2}-\d{2}) $ymd;
        default 'unknown';
    }

    server {
        # 按日期写入不同文件(注意:这种方式会产生很多文件句柄)
        access_log /var/log/nginx/access-$logdate.log json_combined;

        # 更推荐的做法:配合 open_log_file_cache
        open_log_file_cache max=100 inactive=20s valid=1m min_uses=2;
    }
}

2.3 启动和验证

验证日志切割

# 查看当前日志状态
ls -la /var/log/nginx/

# 检查 logrotate 状态
cat /var/lib/logrotate/status | grep nginx

# 模拟切割
logrotate -df /etc/logrotate.d/nginx

# 实际执行切割
logrotate -f /etc/logrotate.d/nginx

# 验证 Nginx 重新打开日志
lsof -p $(cat /run/nginx.pid) | grep log

验证 JSON 日志格式

# 发送测试请求
curl -I http://localhost/

# 查看日志
tail -1 /var/log/nginx/access.log | jq .

# 输出示例
{
  "time_local": "07/Jan/2025:10:30:00 +0800",
  "time_iso8601": "2025-01-07T10:30:00+08:00",
  "remote_addr": "127.0.0.1",
  "request_method": "HEAD",
  "request_uri": "/",
  "status": 200,
  "body_bytes_sent": 0,
  "request_time": 0.001,
  "http_user_agent": "curl/8.5.0"
}

三、示例代码和配置

3.1 完整配置示例

ELK Stack 部署(Docker Compose 方式)

# docker-compose.yml
# ELK Stack 8.x 部署配置

version: '3.8'

services:
  # Elasticsearch
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2
    container_name: elasticsearch
    environment:
      - node.name=es01
      - cluster.name=nginx-logs
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - xpack.security.enabled=true
      - xpack.security.enrollment.enabled=true
      - ELASTIC_PASSWORD=YourStrongPassword123!
      - "ES_JAVA_OPTS=-Xms4g -Xmx4g"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    networks:
      - elk
    healthcheck:
      test: ["CMD-SHELL", "curl -s --cacert /usr/share/elasticsearch/config/certs/http_ca.crt https://localhost:9200 | grep -q 'cluster_name'"]
      interval: 30s
      timeout: 10s
      retries: 5

  # Logstash
  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.2
    container_name: logstash
    environment:
      - "LS_JAVA_OPTS=-Xms2g -Xmx2g"
    volumes:
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
    ports:
      - "5044:5044"  # Beats input
      - "5000:5000"  # TCP input
      - "9600:9600"  # Monitoring API
    networks:
      - elk
    depends_on:
      elasticsearch:
        condition: service_healthy

  # Kibana
  kibana:
    image: docker.elastic.co/kibana/kibana:8.12.2
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=https://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=kibana_system
      - ELASTICSEARCH_PASSWORD=YourKibanaPassword123!
      - ELASTICSEARCH_SSL_VERIFICATIONMODE=none
    ports:
      - "5601:5601"
    networks:
      - elk
    depends_on:
      elasticsearch:
        condition: service_healthy

volumes:
  elasticsearch_data:
    driver: local

networks:
  elk:
    driver: bridge

Logstash 配置文件

# logstash/pipeline/nginx.conf
# Logstash Nginx 日志解析配置

input {
  # 接收 Filebeat 数据
  beats {
    port => 5044
    ssl => false
  }

  # 备用:TCP 输入(测试用)
  tcp {
    port => 5000
    codec => json
  }
}

filter {
  # 如果是 JSON 格式日志,直接解析
  if [message] =~ /^{/ {
    json {
      source => "message"
      target => "nginx"
      remove_field => ["message"]
    }
  }
  # 如果是传统格式日志,使用 grok 解析
  else {
    grok {
      match => { "message" => '%{IPORHOST:nginx.remote_addr} - %{DATA:nginx.remote_user} \[%{HTTPDATE:nginx.time_local}\] "%{WORD:nginx.request_method} %{URIPATHPARAM:nginx.request_uri} HTTP/%{NUMBER:nginx.http_version}" %{NUMBER:nginx.status:int} %{NUMBER:nginx.body_bytes_sent:int} "%{DATA:nginx.http_referer}" "%{DATA:nginx.http_user_agent}"' }
      remove_field => ["message"]
    }
  }

  # 解析时间戳
  date {
    match => ["[nginx][time_iso8601]", "ISO8601"]
    target => "@timestamp"
    remove_field => ["[nginx][time_iso8601]", "[nginx][time_local]"]
  }

  # 转换数据类型
  mutate {
    convert => {
      "[nginx][status]" => "integer"
      "[nginx][body_bytes_sent]" => "integer"
      "[nginx][request_length]" => "integer"
      "[nginx][request_time]" => "float"
    }
  }

  # 解析 upstream_response_time(可能是多个值)
  if [nginx][upstream_response_time] and [nginx][upstream_response_time] != "-" {
    ruby {
      code => '
        urt = event.get("[nginx][upstream_response_time]")
        if urt.is_a?(String) && urt.include?(",")
          times = urt.split(",").map { |t| t.strip.to_f }
          event.set("[nginx][upstream_response_time]", times.last)
          event.set("[nginx][upstream_response_times]", times)
        elsif urt.is_a?(String)
          event.set("[nginx][upstream_response_time]", urt.to_f)
        end
      '
    }
  }

  # GeoIP 地理位置解析
  if [nginx][remote_addr] {
    geoip {
      source => "[nginx][remote_addr]"
      target => "[nginx][geoip]"
      database => "/usr/share/logstash/GeoLite2-City.mmdb"
      add_field => ["[nginx][geoip][coordinates]", "%{[nginx][geoip][longitude]}"]
      add_field => ["[nginx][geoip][coordinates]", "%{[nginx][geoip][latitude]}"]
    }

    mutate {
      convert => { "[nginx][geoip][coordinates]" => "float" }
    }
  }

  # User-Agent 解析
  if [nginx][http_user_agent] {
    useragent {
      source => "[nginx][http_user_agent]"
      target => "[nginx][user_agent]"
    }
  }

  # 添加元数据
  mutate {
    add_field => { "[nginx][log_type]" => "access" }

    # 移除不需要的字段
    remove_field => ["agent", "ecs", "host", "input", "log"]
  }

  # 标记慢请求
  if [nginx][request_time] and [nginx][request_time] > 1 {
    mutate {
      add_tag => ["slow_request"]
    }
  }

  # 标记错误请求
  if [nginx][status] >= 400 {
    mutate {
      add_tag => ["error_request"]
    }
  }

  if [nginx][status] >= 500 {
    mutate {
      add_tag => ["server_error"]
    }
  }
}

output {
  # 输出到 Elasticsearch
  elasticsearch {
    hosts => ["https://elasticsearch:9200"]
    user => "elastic"
    password => "YourStrongPassword123!"
    ssl_certificate_verification => false
    index => "nginx-access-%{+YYYY.MM.dd}"
    template_name => "nginx-access"
    template => "/usr/share/logstash/templates/nginx-access.json"
    template_overwrite => true
  }

  # 调试输出(生产环境注释掉)
  # stdout {
  #   codec => rubydebug
  # }
}

Elasticsearch 索引模板

{
  "index_patterns": ["nginx-access-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "index.lifecycle.name": "nginx-logs-policy",
      "index.lifecycle.rollover_alias": "nginx-access"
    },
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "nginx": {
          "properties": {
            "remote_addr": {
              "type": "ip"
            },
            "request_method": {
              "type": "keyword"
            },
            "request_uri": {
              "type": "keyword",
              "fields": {
                "text": {
                  "type": "text"
                }
              }
            },
            "status": {
              "type": "integer"
            },
            "body_bytes_sent": {
              "type": "long"
            },
            "request_time": {
              "type": "float"
            },
            "upstream_response_time": {
              "type": "float"
            },
            "http_referer": {
              "type": "keyword"
            },
            "http_user_agent": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 512
                }
              }
            },
            "server_name": {
              "type": "keyword"
            },
            "geoip": {
              "properties": {
                "location": {
                  "type": "geo_point"
                },
                "country_name": {
                  "type": "keyword"
                },
                "city_name": {
                  "type": "keyword"
                }
              }
            }
          }
        }
      }
    }
  }
}

Filebeat 配置

# /etc/filebeat/filebeat.yml
# Filebeat 8.x Nginx 日志采集配置

filebeat.inputs:
  # Nginx 访问日志
  - type: log
    id: nginx-access
    enabled: true
    paths:
      - /var/log/nginx/access.log
      - /var/log/nginx/*/access.log
    fields:
      log_type: nginx_access
      env: production
    fields_under_root: true
    json.keys_under_root: true
    json.add_error_key: true
    json.message_key: message

  # Nginx 错误日志
  - type: log
    id: nginx-error
    enabled: true
    paths:
      - /var/log/nginx/error.log
      - /var/log/nginx/*/error.log
    fields:
      log_type: nginx_error
      env: production
    fields_under_root: true
    multiline:
      pattern: '^\d{4}/\d{2}/\d{2}'
      negate: true
      match: after

# 处理器
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - drop_fields:
      fields: ["agent.ephemeral_id", "agent.id", "agent.name", "ecs.version"]
      ignore_missing: true

# 输出到 Logstash
output.logstash:
  hosts: ["logstash:5044"]
  ssl.enabled: false
  bulk_max_size: 2048
  worker: 2

# 日志
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0640

# 监控
monitoring.enabled: true
monitoring.cluster_uuid: "nginx-logs"

3.2 实际应用案例

案例一:实时流量监控仪表盘

Kibana Dashboard 配置(通过 DevTools 导入):

POST _dashboards/dashboard/nginx-overview
{
  "title": "Nginx 访问概览",
  "panels": [
    {
      "type": "metric",
      "title": "总请求数",
      "query": {
        "index": "nginx-access-*",
        "agg": "count"
      }
    },
    {
      "type": "metric",
      "title": "平均响应时间",
      "query": {
        "index": "nginx-access-*",
        "agg": "avg",
        "field": "nginx.request_time"
      }
    },
    {
      "type": "pie",
      "title": "状态码分布",
      "query": {
        "index": "nginx-access-*",
        "agg": "terms",
        "field": "nginx.status"
      }
    },
    {
      "type": "line",
      "title": "请求量趋势",
      "query": {
        "index": "nginx-access-*",
        "agg": "date_histogram",
        "interval": "1m"
      }
    }
  ]
}

常用 Kibana 查询

# 查找 500 错误
nginx.status: 500

# 查找慢请求(超过 2 秒)
nginx.request_time: >2

# 查找特定 IP 的访问
nginx.remote_addr: "192.168.1.100"

# 查找特定 URI 的访问
nginx.request_uri: "/api/users*"

# 组合查询:生产环境的 500 错误
env: production AND nginx.status: 500

# 时间范围+条件
nginx.status: >=400 AND @timestamp: [now-1h TO now]

案例二:异常访问告警配置

# Elasticsearch Watcher 告警规则
PUT _watcher/watch/nginx_5xx_alert
{
  "trigger": {
    "schedule": {
      "interval": "5m"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": ["nginx-access-*"],
        "body": {
          "query": {
            "bool": {
              "must": [
                {"range": {"@timestamp": {"gte": "now-5m"}}},
                {"range": {"nginx.status": {"gte": 500}}}
              ]
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": {
      "ctx.payload.hits.total.value": {
        "gt": 10
      }
    }
  },
  "actions": {
    "send_email": {
      "email": {
        "to": "ops@example.com",
        "subject": "Nginx 5xx 错误告警",
        "body": {
          "text": "过去 5 分钟内发生了 {{ctx.payload.hits.total.value}} 次 5xx 错误,请及时处理!"
        }
      }
    },
    "webhook": {
      "webhook": {
        "scheme": "https",
        "host": "hooks.slack.com",
        "port": 443,
        "method": "post",
        "path": "/services/xxx/yyy/zzz",
        "body": "{\"text\": \"Nginx 告警: 5 分钟内 {{ctx.payload.hits.total.value}} 次 5xx 错误\"}"
      }
    }
  }
}

案例三:自动化日志清理脚本

#!/bin/bash
# cleanup_logs.sh - 自动化日志清理和归档

# 配置
LOG_DIR="/var/log/nginx"
ARCHIVE_DIR="/data/archive/nginx"
RETENTION_DAYS=90
ARCHIVE_DAYS=30

# 日志函数
log() {
  echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

log "开始执行日志清理..."

# 创建归档目录
mkdir -p "$ARCHIVE_DIR"

# 压缩超过 1 天的未压缩日志
log "压缩未压缩的旧日志..."
find "$LOG_DIR" -name "*.log-*" ! -name "*.gz" -mtime +1 -exec gzip {} \;

# 归档超过 30 天的日志到归档目录
log "归档超过 ${ARCHIVE_DAYS} 天的日志..."
find "$LOG_DIR" -name "*.log-*.gz" -mtime +$ARCHIVE_DAYS -exec mv {} "$ARCHIVE_DIR/" \;

# 删除超过 90 天的归档日志
log "删除超过 ${RETENTION_DAYS} 天的归档日志..."
find "$ARCHIVE_DIR" -name "*.gz" -mtime +$RETENTION_DAYS -delete

# 清理空目录
find "$LOG_DIR" -type d -empty -delete 2>/dev/null
find "$ARCHIVE_DIR" -type d -empty -delete 2>/dev/null

# 统计
log "清理完成!"
log "当前日志目录大小: $(du -sh $LOG_DIR | cut -f1)"
log "归档目录大小: $(du -sh $ARCHIVE_DIR | cut -f1)"

# 检查磁盘空间
DISK_USAGE=$(df -h "$LOG_DIR" | awk 'NR==2 {print $5}' | tr -d '%')
if [ "$DISK_USAGE" -gt 80 ]; then
  log "警告: 磁盘使用率超过 80%!当前: ${DISK_USAGE}%"
fi
# 添加定时任务
crontab -e

# 每天凌晨 3 点执行
0 3 * * * /usr/local/bin/cleanup_logs.sh >> /var/log/cleanup_logs.log 2>&1

案例四:实时日志分析脚本

#!/bin/bash
# realtime_analysis.sh - Nginx 日志实时分析

LOG_FILE=${1:-/var/log/nginx/access.log}
INTERVAL=${2:-60}

echo "=== Nginx 实时日志分析 ==="
echo "日志文件: $LOG_FILE"
echo "统计间隔: ${INTERVAL} 秒"
echo ""

while true; do
    clear
    echo "====== $(date '+%Y-%m-%d %H:%M:%S') ======"

    # 最近 N 秒的日志
    START_TIME=$(date -d "-${INTERVAL} seconds" '+%d/%b/%Y:%H:%M:%S')

    echo ""
    echo "--- 请求量统计 ---"
    # 如果是 JSON 格式日志
    if head -1 "$LOG_FILE" | grep -q "^{"; then
        # JSON 格式处理
        tail -10000 "$LOG_FILE" | jq -r '.status' 2>/dev/null | sort | uniq -c | sort -rn | head -10
    else
        # 传统格式处理
        awk -v start="$START_TIME" '
            $4 >= "["start {print $9}
        ' "$LOG_FILE" | sort | uniq -c | sort -rn | head -10
    fi

    echo ""
    echo "--- Top 10 慢请求 ---"
    if head -1 "$LOG_FILE" | grep -q "^{"; then
        tail -10000 "$LOG_FILE" | jq -r 'select(.request_time > 1) | "\(.request_time)s \(.request_uri)"' 2>/dev/null | sort -rn | head -10
    else
        tail -10000 "$LOG_FILE" | awk -F'rt=' '{print $2}' | sort -rn | head -10
    fi

    echo ""
    echo "--- Top 10 访问 IP ---"
    if head -1 "$LOG_FILE" | grep -q "^{"; then
        tail -10000 "$LOG_FILE" | jq -r '.remote_addr' 2>/dev/null | sort | uniq -c | sort -rn | head -10
    else
        tail -10000 "$LOG_FILE" | awk '{print $1}' | sort | uniq -c | sort -rn | head -10
    fi

    echo ""
    echo "--- 实时 QPS ---"
    CURRENT_TIME=$(date +%s)
    QPS=$(awk -v ct="$CURRENT_TIME" -v interval="$INTERVAL" '
        BEGIN { count=0 }
        {
            # 简化计算,统计最近的行数
            count++
        }
        END {
            if (count > 10000) count = 10000
            printf "%.1f", count / interval
        }
    ' <<< "$(tail -10000 "$LOG_FILE")")
    echo "当前 QPS: $QPS"

    echo ""
    echo "按 Ctrl+C 退出..."
    sleep $INTERVAL
done

四、最佳实践和注意事项

4.1 最佳实践

1. 日志格式标准化

# 推荐使用 JSON 格式,便于解析
# 字段命名统一,便于跨项目复用

# 好的实践:统一的字段命名
log_format json_combined escape=json
    '{'
        '"@timestamp":"$time_iso8601",'  # 统一使用 @timestamp
        '"client_ip":"$remote_addr",'     # 清晰的字段名
        '"method":"$request_method",'
        '"uri":"$request_uri",'
        '"status":$status,'
        '"latency_ms":$request_time'      # 明确单位
    '}';

2. 日志分级存储

# 热数据:最近 7 天,SSD 存储,Elasticsearch
# 温数据:7-30 天,HDD 存储,Elasticsearch
# 冷数据:30 天以上,对象存储/归档
# 删除:90 天以上

# ILM 策略配置
PUT _ilm/policy/nginx-logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "50GB",
            "max_age": "1d"
          }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": { "number_of_shards": 1 },
          "forcemerge": { "max_num_segments": 1 }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "allocate": { "require": { "data": "cold" } }
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

3. 采样日志记录

# 高流量场景下的日志采样
# 只记录 1% 的正常请求,但记录所有错误

# 方案一:使用 split_clients
split_clients "${remote_addr}${request_uri}" $log_sample {
    1%    1;
    *     0;
}

# 正常请求采样记录
access_log /var/log/nginx/access.log json_combined if=$log_sample;

# 错误请求全量记录
map $status $log_error {
    ~^[45] 1;
    default 0;
}
access_log /var/log/nginx/error_requests.log json_combined if=$log_error;

4. 敏感信息脱敏

# 不要记录敏感参数
# 使用 map 过滤敏感信息

map $request_uri $clean_uri {
    ~^(?<path>[^?]+)\?.*password=.*$ "$path?password=***";
    ~^(?<path>[^?]+)\?.*token=.*$ "$path?token=***";
    default $request_uri;
}

log_format safe_json escape=json
    '{'
        '"uri":"$clean_uri"'  # 使用脱敏后的 URI
    '}';

4.2 注意事项

常见错误表

错误 原因 解决方案
日志不切割 logrotate 配置错误或权限问题 检查配置语法和权限
切割后日志为空 没有发送 USR1 信号 确保 postrotate 正确执行
Filebeat CPU 高 日志量过大或正则复杂 优化配置、增加资源
ES 索引膨胀 没有配置 ILM 配置生命周期管理
Logstash 解析失败 Grok 模式不匹配 使用 JSON 格式避免解析
日志延迟大 网络或处理瓶颈 增加 buffer、worker

性能优化建议

# 1. Filebeat 优化
# 增加批量大小
bulk_max_size: 4096

# 增加 worker 数
output.logstash:
  worker: 4

# 2. Logstash 优化
# 增加 pipeline worker
pipeline.workers: 4
pipeline.batch.size: 500

# 3. Elasticsearch 优化
# 调整刷新间隔
PUT nginx-access-*/_settings
{
  "index.refresh_interval": "30s"
}

# 调整副本数(写入时减少)
PUT nginx-access-*/_settings
{
  "index.number_of_replicas": 0
}

磁盘空间管理

# 预估日志大小
# 假设:每条日志 500 字节,日均 1000 万请求
# 原始日志:500 * 10,000,000 = 5GB/天
# Gzip 压缩后:约 500MB/天
# ES 存储(含索引):约 2GB/天

# 保留 90 天需要:
# 压缩日志:45GB
# ES 存储:180GB
# 建议预留:300GB

# 监控磁盘使用
df -h /var/log
df -h /var/lib/elasticsearch

五、故障排查和监控

5.1 故障排查

问题一:日志不写入

# 排查步骤

# 1. 检查 Nginx 配置
nginx -T | grep access_log

# 2. 检查日志目录权限
ls -la /var/log/nginx/
namei -l /var/log/nginx/access.log

# 3. 检查磁盘空间
df -h /var/log

# 4. 检查 SELinux
getenforce
ausearch -m avc -ts recent

# 5. 检查 Nginx 错误日志
tail -100 /var/log/nginx/error.log

# 6. 测试写入
sudo -u nginx touch /var/log/nginx/test

问题二:Filebeat 无法采集

# 1. 检查 Filebeat 状态
systemctl status filebeat
journalctl -u filebeat -f

# 2. 检查配置
filebeat test config
filebeat test output

# 3. 检查日志文件权限
ls -la /var/log/nginx/
# Filebeat 用户需要读取权限

# 4. 检查 registry
cat /var/lib/filebeat/registry/filebeat/log.json | jq .

# 5. 手动测试
filebeat -e -c /etc/filebeat/filebeat.yml

问题三:Logstash 解析失败

# 1. 启用调试输出
# 在 output 中添加:
output {
  stdout { codec => rubydebug }
}

# 2. 测试 Grok 模式
# 使用 Kibana 的 Grok Debugger
# 或在线工具:https://grokdebugger.com/

# 3. 查看 Logstash 日志
tail -f /var/log/logstash/logstash-plain.log

# 4. 检查 tag
# 解析失败的日志会有 _grokparsefailure 标签
GET nginx-access-*/_search
{
  "query": {
    "term": { "tags": "_grokparsefailure" }
  }
}

问题四:Elasticsearch 索引异常

# 1. 检查集群健康
curl -s localhost:9200/_cluster/health?pretty

# 2. 检查索引状态
curl -s localhost:9200/_cat/indices/nginx-*?v

# 3. 检查分片分配
curl -s localhost:9200/_cat/shards/nginx-*?v

# 4. 查看未分配分片原因
curl -s localhost:9200/_cluster/allocation/explain?pretty

# 5. 检查磁盘空间
curl -s localhost:9200/_cat/allocation?v

5.2 性能监控

Filebeat 监控

# 查看 Filebeat 指标
curl -s localhost:5066/stats | jq .

# 关键指标
# - filebeat.events.active: 正在处理的事件数
# - filebeat.events.done: 已完成的事件数
# - libbeat.output.events.acked: 已确认的事件数
# - libbeat.output.events.dropped: 丢弃的事件数

Logstash 监控

# 查看 Logstash 指标
curl -s localhost:9600/_node/stats?pretty

# 关键指标
# - jvm.mem.heap_used_percent: JVM 堆使用率
# - process.cpu.percent: CPU 使用率
# - pipeline.events.in: 输入事件数
# - pipeline.events.out: 输出事件数
# - pipeline.events.filtered: 过滤事件数

Elasticsearch 监控

#!/bin/bash
# monitor_elk.sh

ES_HOST="localhost:9200"

echo "=== Elasticsearch 集群监控 ==="
echo ""

echo "--- 集群健康 ---"
curl -s "$ES_HOST/_cluster/health?pretty"

echo ""
echo "--- 节点状态 ---"
curl -s "$ES_HOST/_cat/nodes?v"

echo ""
echo "--- 索引大小 ---"
curl -s "$ES_HOST/_cat/indices/nginx-*?v&s=store.size:desc" | head -20

echo ""
echo "--- 分片分布 ---"
curl -s "$ES_HOST/_cat/shards/nginx-*?v" | head -20

echo ""
echo "--- JVM 内存 ---"
curl -s "$ES_HOST/_nodes/stats/jvm?pretty" | grep -A5 "heap_used"

5.3 备份与恢复

Elasticsearch 快照备份

# 1. 注册快照仓库
PUT _snapshot/nginx_backup
{
  "type": "fs",
  "settings": {
    "location": "/backup/elasticsearch",
    "compress": true
  }
}

# 2. 创建快照
PUT _snapshot/nginx_backup/snapshot_20250107?wait_for_completion=true
{
  "indices": "nginx-access-*",
  "ignore_unavailable": true,
  "include_global_state": false
}

# 3. 查看快照
GET _snapshot/nginx_backup/_all

# 4. 恢复快照
POST _snapshot/nginx_backup/snapshot_20250107/_restore
{
  "indices": "nginx-access-2025.01.06",
  "rename_pattern": "nginx-(.+)",
  "rename_replacement": "restored_nginx-$1"
}

自动化备份脚本

#!/bin/bash
# backup_elk.sh - ELK 备份脚本

ES_HOST="localhost:9200"
BACKUP_REPO="nginx_backup"
DATE=$(date +%Y%m%d)
RETENTION_DAYS=30

# 创建快照
echo "创建快照: snapshot_$DATE"
curl -X PUT "$ES_HOST/_snapshot/$BACKUP_REPO/snapshot_$DATE?wait_for_completion=true" \
  -H 'Content-Type: application/json' \
  -d '{
    "indices": "nginx-access-*,nginx-error-*",
    "ignore_unavailable": true,
    "include_global_state": false
  }'

# 删除旧快照
echo "清理 ${RETENTION_DAYS} 天前的快照..."
OLD_DATE=$(date -d "-${RETENTION_DAYS} days" +%Y%m%d)
curl -s "$ES_HOST/_snapshot/$BACKUP_REPO/_all" | \
  jq -r ".snapshots[].snapshot" | \
  while read snapshot; do
    SNAP_DATE=$(echo $snapshot | grep -oP '\d{8}')
    if [[ "$SNAP_DATE" < "$OLD_DATE" ]]; then
      echo "删除快照: $snapshot"
      curl -X DELETE "$ES_HOST/_snapshot/$BACKUP_REPO/$snapshot"
    fi
  done

echo "备份完成!"

六、总结

6.1 技术要点回顾

日志管理的核心就是四个字:采、存、析、用

  1. 采集:使用 JSON 格式日志,Filebeat 轻量采集
  2. 存储:logrotate 本地切割,ES 分布式存储
  3. 分析:Logstash 解析转换,Kibana 可视化
  4. 应用:实时监控告警,问题快速定位

关键配置清单:

  • Nginx 使用 JSON 格式日志(便于解析)
  • logrotate 配置(daily + compress + 90 天保留)
  • Filebeat 采集配置(JSON 解析 + 多路径)
  • Logstash 过滤配置(时间解析 + 类型转换 + GeoIP)
  • ES 索引模板(字段类型 + ILM 策略)

6.2 进阶学习方向

  • 日志安全:日志脱敏、访问控制、审计合规
  • 智能分析:机器学习异常检测、根因分析
  • 分布式追踪:结合 Jaeger/Zipkin 实现全链路追踪
  • 云原生日志:Loki、Fluentd、云厂商日志服务

6.3 参考资料

欢迎到 云栈社区 交流更多关于 ELK StackNginx 的技术话题。

附录

A. 命令速查表

命令 说明
nginx -s reopen 重新打开日志文件
kill -USR1 $(cat /run/nginx.pid) 发送 USR1 信号
logrotate -f /etc/logrotate.d/nginx 强制执行日志切割
logrotate -d /etc/logrotate.d/nginx 测试配置(不执行)
filebeat test config 测试 Filebeat 配置
filebeat test output 测试 Filebeat 输出
curl localhost:9200/_cat/indices 查看 ES 索引
curl localhost:9600/_node/stats 查看 Logstash 状态

B. 配置参数详解

参数 模块 说明
log_format Nginx 定义日志格式
access_log Nginx 指定访问日志路径和格式
open_log_file_cache Nginx 日志文件描述符缓存
daily/weekly/monthly logrotate 切割周期
rotate N logrotate 保留份数
compress logrotate 压缩旧日志
sharedscripts logrotate 脚本只执行一次
bulk_max_size Filebeat 批量发送大小
pipeline.workers Logstash 处理线程数

C. 术语表

术语 解释
ELK Stack Elasticsearch + Logstash + Kibana 的日志解决方案
Filebeat 轻量级日志采集器
Grok Logstash 的正则解析插件
ILM Index Lifecycle Management,索引生命周期管理
Shard Elasticsearch 的数据分片
Replica Elasticsearch 的副本分片
USR1 Nginx 信号,用于重新打开日志文件
logrotate Linux 日志切割工具



上一篇:Linux系统监控与性能调优实战:运维工程师必会的核心命令与技巧解析
下一篇:使用AME Wizard工具一键部署ReviOS:Win10/Win11系统精简与优化教程
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-16 00:35 , Processed in 0.208508 second(s), 38 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表