一、概述
1.1 背景介绍
Nginx 日志堪称运维人员的金矿,每条访问记录都蕴含着宝贵信息:用户来源、访问页面、响应时长以及错误情况。然而,若缺乏有效管理,这座金矿可能迅速演变为灾难。例如,日志文件膨胀至 100GB 撑爆磁盘,或因未切割日志导致历史记录丢失,使故障无法追溯。
2022 年某项目中,客户反馈系统间歇性变慢,持续两周却找不到原因。后来部署 ELK 分析 Nginx 日志,发现每天下午 3 点左右有爬虫疯狂抓取,每秒数百请求,直接将后端打垮。此类问题靠人工查看日志难以发现,但在 Kibana 中却一目了然。
管理日志看似简单:切割、存储、分析。但要真正做到位,需考虑诸多细节:切割策略如何制定、日志格式如何设计、ELK 如何部署、告警如何配置。本文将逐一详解这些问题。
1.2 技术特点
日志切割方案对比
| 方案 |
优点 |
缺点 |
适用场景 |
| logrotate |
系统自带、稳定可靠 |
功能单一 |
中小规模、传统部署 |
| Nginx 自带 |
配置简单 |
需要信号控制 |
简单场景 |
| cronolog |
按时间自动切割 |
额外进程开销 |
需要实时切割 |
| Filebeat |
功能强大、与 ELK 集成 |
资源占用 |
大规模、需要分析 |
ELK Stack 架构
Nginx -> Filebeat -> Logstash -> Elasticsearch -> Kibana
日志 采集 解析 存储 展示
- Filebeat:轻量级日志采集器,部署在 Nginx 服务器上
- Logstash:日志解析和转换,支持丰富的过滤器
- Elasticsearch:分布式搜索引擎,存储和索引日志
- Kibana:可视化平台,用于图表和仪表盘
1.3 适用场景
日志切割适用于
- 所有生产环境的 Nginx 服务器
- 需要保留历史日志的场景
- 磁盘空间有限的环境
ELK 分析适用于
- 多台 Nginx 服务器集中管理
- 需要实时监控和告警
- 安全审计和合规要求
- 性能分析和故障排查
- 用户行为分析
1.4 环境要求
| 组件 |
版本要求 |
最低配置 |
推荐配置 |
| 操作系统 |
Rocky Linux 9 / Ubuntu 24.04 |
- |
- |
| Nginx |
1.26.x / 1.27.x |
- |
- |
| Elasticsearch |
8.x (8.12+) |
4GB 内存 |
16GB 内存 |
| Logstash |
8.x |
2GB 内存 |
4GB 内存 |
| Kibana |
8.x |
1GB 内存 |
2GB 内存 |
| Filebeat |
8.x |
256MB 内存 |
512MB 内存 |
| Java |
OpenJDK 17+ |
- |
- |
| 磁盘 |
SSD 推荐 |
50GB |
500GB+ |
二、详细步骤
2.1 准备工作
优化 Nginx 日志格式
默认的 combined 格式信息有限,生产环境建议使用自定义格式:
# /etc/nginx/nginx.conf
http {
# JSON 格式日志(推荐,便于 ELK 解析)
log_format json_combined escape=json
'{'
'"time_local":"$time_local",'
'"time_iso8601":"$time_iso8601",'
'"remote_addr":"$remote_addr",'
'"remote_user":"$remote_user",'
'"request_method":"$request_method",'
'"request_uri":"$request_uri",'
'"uri":"$uri",'
'"args":"$args",'
'"server_protocol":"$server_protocol",'
'"status":$status,'
'"body_bytes_sent":$body_bytes_sent,'
'"request_time":$request_time,'
'"upstream_response_time":"$upstream_response_time",'
'"upstream_connect_time":"$upstream_connect_time",'
'"upstream_header_time":"$upstream_header_time",'
'"upstream_addr":"$upstream_addr",'
'"http_referer":"$http_referer",'
'"http_user_agent":"$http_user_agent",'
'"http_x_forwarded_for":"$http_x_forwarded_for",'
'"http_host":"$http_host",'
'"server_name":"$server_name",'
'"request_length":$request_length,'
'"ssl_protocol":"$ssl_protocol",'
'"ssl_cipher":"$ssl_cipher",'
'"gzip_ratio":"$gzip_ratio"'
'}';
# 传统格式日志(兼容老工具)
log_format detailed '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" '
'rt=$request_time uct="$upstream_connect_time" '
'uht="$upstream_header_time" urt="$upstream_response_time" '
'ua="$upstream_addr" us="$upstream_status" '
'cs=$upstream_cache_status';
# 使用 JSON 格式
access_log /var/log/nginx/access.log json_combined;
error_log /var/log/nginx/error.log warn;
}
日志字段说明
| 字段 |
说明 |
示例 |
| $time_iso8601 |
ISO8601 时间格式 |
2025-01-07T10:30:00+08:00 |
| $remote_addr |
客户端 IP |
192.168.1.100 |
| $request_method |
请求方法 |
GET/POST |
| $request_uri |
完整请求 URI(含参数) |
/api/user?id=1 |
| $status |
HTTP 状态码 |
200/404/500 |
| $body_bytes_sent |
响应体大小 |
1234 |
| $request_time |
请求处理时间(秒) |
0.052 |
| $upstream_response_time |
后端响应时间 |
0.048 |
| $http_user_agent |
用户代理 |
Mozilla/5.0... |
| $http_x_forwarded_for |
真实客户端 IP(经过代理) |
10.0.0.1 |
创建日志目录
# 创建日志目录
mkdir -p /var/log/nginx/{access,error,archive}
mkdir -p /data/logs/nginx
# 设置权限
chown -R nginx:nginx /var/log/nginx
chmod -R 755 /var/log/nginx
# 按站点分离日志(可选)
mkdir -p /var/log/nginx/sites/{www,api,admin}
2.2 核心配置
方案一:logrotate 日志切割(推荐)
# /etc/logrotate.d/nginx
/var/log/nginx/*.log
/var/log/nginx/*/*.log {
# 每天切割
daily
# 保留 90 天
rotate 90
# 压缩旧日志
compress
# 延迟压缩(保留最近一个不压缩,便于实时分析)
delaycompress
# 日志文件不存在不报错
missingok
# 空日志不切割
notifempty
# 使用日期作为后缀
dateext
dateformat -%Y%m%d
# 创建新日志文件的权限
create 0644 nginx nginx
# 切割后执行的脚本
sharedscripts
postrotate
# 发送 USR1 信号让 Nginx 重新打开日志文件
if [ -f /run/nginx.pid ]; then
kill -USR1 $(cat /run/nginx.pid)
fi
endscript
}
# 测试配置
logrotate -d /etc/logrotate.d/nginx
# 手动执行切割
logrotate -f /etc/logrotate.d/nginx
# 查看切割结果
ls -la /var/log/nginx/
# access.log
# access.log-20250106.gz
# access.log-20250105.gz
方案二:按小时切割(高流量场景)
# /etc/logrotate.d/nginx-hourly
/var/log/nginx/*.log {
hourly
rotate 168 # 保留 7 天(7*24=168 小时)
compress
delaycompress
missingok
notifempty
dateext
dateformat -%Y%m%d%H
create 0644 nginx nginx
sharedscripts
postrotate
if [ -f /run/nginx.pid ]; then
kill -USR1 $(cat /run/nginx.pid)
fi
endscript
}
# 添加 hourly 定时任务
# 创建 hourly 配置
cat > /etc/cron.hourly/nginx-logrotate << 'EOF'
#!/bin/bash
/usr/sbin/logrotate /etc/logrotate.d/nginx-hourly
EOF
chmod +x /etc/cron.hourly/nginx-logrotate
方案三:Nginx 变量动态日志路径
# 按日期自动切换日志文件
# 需要借助 map 或 if 实现
http {
# 使用 map 提取日期
map $time_iso8601 $logdate {
~^(?<ymd>\d{4}-\d{2}-\d{2}) $ymd;
default 'unknown';
}
server {
# 按日期写入不同文件(注意:这种方式会产生很多文件句柄)
access_log /var/log/nginx/access-$logdate.log json_combined;
# 更推荐的做法:配合 open_log_file_cache
open_log_file_cache max=100 inactive=20s valid=1m min_uses=2;
}
}
2.3 启动和验证
验证日志切割
# 查看当前日志状态
ls -la /var/log/nginx/
# 检查 logrotate 状态
cat /var/lib/logrotate/status | grep nginx
# 模拟切割
logrotate -df /etc/logrotate.d/nginx
# 实际执行切割
logrotate -f /etc/logrotate.d/nginx
# 验证 Nginx 重新打开日志
lsof -p $(cat /run/nginx.pid) | grep log
验证 JSON 日志格式
# 发送测试请求
curl -I http://localhost/
# 查看日志
tail -1 /var/log/nginx/access.log | jq .
# 输出示例
{
"time_local": "07/Jan/2025:10:30:00 +0800",
"time_iso8601": "2025-01-07T10:30:00+08:00",
"remote_addr": "127.0.0.1",
"request_method": "HEAD",
"request_uri": "/",
"status": 200,
"body_bytes_sent": 0,
"request_time": 0.001,
"http_user_agent": "curl/8.5.0"
}
三、示例代码和配置
3.1 完整配置示例
ELK Stack 部署(Docker Compose 方式)
# docker-compose.yml
# ELK Stack 8.x 部署配置
version: '3.8'
services:
# Elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.2
container_name: elasticsearch
environment:
- node.name=es01
- cluster.name=nginx-logs
- discovery.type=single-node
- bootstrap.memory_lock=true
- xpack.security.enabled=true
- xpack.security.enrollment.enabled=true
- ELASTIC_PASSWORD=YourStrongPassword123!
- "ES_JAVA_OPTS=-Xms4g -Xmx4g"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
networks:
- elk
healthcheck:
test: ["CMD-SHELL", "curl -s --cacert /usr/share/elasticsearch/config/certs/http_ca.crt https://localhost:9200 | grep -q 'cluster_name'"]
interval: 30s
timeout: 10s
retries: 5
# Logstash
logstash:
image: docker.elastic.co/logstash/logstash:8.12.2
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xms2g -Xmx2g"
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
ports:
- "5044:5044" # Beats input
- "5000:5000" # TCP input
- "9600:9600" # Monitoring API
networks:
- elk
depends_on:
elasticsearch:
condition: service_healthy
# Kibana
kibana:
image: docker.elastic.co/kibana/kibana:8.12.2
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=https://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=YourKibanaPassword123!
- ELASTICSEARCH_SSL_VERIFICATIONMODE=none
ports:
- "5601:5601"
networks:
- elk
depends_on:
elasticsearch:
condition: service_healthy
volumes:
elasticsearch_data:
driver: local
networks:
elk:
driver: bridge
Logstash 配置文件
# logstash/pipeline/nginx.conf
# Logstash Nginx 日志解析配置
input {
# 接收 Filebeat 数据
beats {
port => 5044
ssl => false
}
# 备用:TCP 输入(测试用)
tcp {
port => 5000
codec => json
}
}
filter {
# 如果是 JSON 格式日志,直接解析
if [message] =~ /^{/ {
json {
source => "message"
target => "nginx"
remove_field => ["message"]
}
}
# 如果是传统格式日志,使用 grok 解析
else {
grok {
match => { "message" => '%{IPORHOST:nginx.remote_addr} - %{DATA:nginx.remote_user} \[%{HTTPDATE:nginx.time_local}\] "%{WORD:nginx.request_method} %{URIPATHPARAM:nginx.request_uri} HTTP/%{NUMBER:nginx.http_version}" %{NUMBER:nginx.status:int} %{NUMBER:nginx.body_bytes_sent:int} "%{DATA:nginx.http_referer}" "%{DATA:nginx.http_user_agent}"' }
remove_field => ["message"]
}
}
# 解析时间戳
date {
match => ["[nginx][time_iso8601]", "ISO8601"]
target => "@timestamp"
remove_field => ["[nginx][time_iso8601]", "[nginx][time_local]"]
}
# 转换数据类型
mutate {
convert => {
"[nginx][status]" => "integer"
"[nginx][body_bytes_sent]" => "integer"
"[nginx][request_length]" => "integer"
"[nginx][request_time]" => "float"
}
}
# 解析 upstream_response_time(可能是多个值)
if [nginx][upstream_response_time] and [nginx][upstream_response_time] != "-" {
ruby {
code => '
urt = event.get("[nginx][upstream_response_time]")
if urt.is_a?(String) && urt.include?(",")
times = urt.split(",").map { |t| t.strip.to_f }
event.set("[nginx][upstream_response_time]", times.last)
event.set("[nginx][upstream_response_times]", times)
elsif urt.is_a?(String)
event.set("[nginx][upstream_response_time]", urt.to_f)
end
'
}
}
# GeoIP 地理位置解析
if [nginx][remote_addr] {
geoip {
source => "[nginx][remote_addr]"
target => "[nginx][geoip]"
database => "/usr/share/logstash/GeoLite2-City.mmdb"
add_field => ["[nginx][geoip][coordinates]", "%{[nginx][geoip][longitude]}"]
add_field => ["[nginx][geoip][coordinates]", "%{[nginx][geoip][latitude]}"]
}
mutate {
convert => { "[nginx][geoip][coordinates]" => "float" }
}
}
# User-Agent 解析
if [nginx][http_user_agent] {
useragent {
source => "[nginx][http_user_agent]"
target => "[nginx][user_agent]"
}
}
# 添加元数据
mutate {
add_field => { "[nginx][log_type]" => "access" }
# 移除不需要的字段
remove_field => ["agent", "ecs", "host", "input", "log"]
}
# 标记慢请求
if [nginx][request_time] and [nginx][request_time] > 1 {
mutate {
add_tag => ["slow_request"]
}
}
# 标记错误请求
if [nginx][status] >= 400 {
mutate {
add_tag => ["error_request"]
}
}
if [nginx][status] >= 500 {
mutate {
add_tag => ["server_error"]
}
}
}
output {
# 输出到 Elasticsearch
elasticsearch {
hosts => ["https://elasticsearch:9200"]
user => "elastic"
password => "YourStrongPassword123!"
ssl_certificate_verification => false
index => "nginx-access-%{+YYYY.MM.dd}"
template_name => "nginx-access"
template => "/usr/share/logstash/templates/nginx-access.json"
template_overwrite => true
}
# 调试输出(生产环境注释掉)
# stdout {
# codec => rubydebug
# }
}
Elasticsearch 索引模板
{
"index_patterns": ["nginx-access-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "nginx-logs-policy",
"index.lifecycle.rollover_alias": "nginx-access"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"nginx": {
"properties": {
"remote_addr": {
"type": "ip"
},
"request_method": {
"type": "keyword"
},
"request_uri": {
"type": "keyword",
"fields": {
"text": {
"type": "text"
}
}
},
"status": {
"type": "integer"
},
"body_bytes_sent": {
"type": "long"
},
"request_time": {
"type": "float"
},
"upstream_response_time": {
"type": "float"
},
"http_referer": {
"type": "keyword"
},
"http_user_agent": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 512
}
}
},
"server_name": {
"type": "keyword"
},
"geoip": {
"properties": {
"location": {
"type": "geo_point"
},
"country_name": {
"type": "keyword"
},
"city_name": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
Filebeat 配置
# /etc/filebeat/filebeat.yml
# Filebeat 8.x Nginx 日志采集配置
filebeat.inputs:
# Nginx 访问日志
- type: log
id: nginx-access
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/nginx/*/access.log
fields:
log_type: nginx_access
env: production
fields_under_root: true
json.keys_under_root: true
json.add_error_key: true
json.message_key: message
# Nginx 错误日志
- type: log
id: nginx-error
enabled: true
paths:
- /var/log/nginx/error.log
- /var/log/nginx/*/error.log
fields:
log_type: nginx_error
env: production
fields_under_root: true
multiline:
pattern: '^\d{4}/\d{2}/\d{2}'
negate: true
match: after
# 处理器
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- drop_fields:
fields: ["agent.ephemeral_id", "agent.id", "agent.name", "ecs.version"]
ignore_missing: true
# 输出到 Logstash
output.logstash:
hosts: ["logstash:5044"]
ssl.enabled: false
bulk_max_size: 2048
worker: 2
# 日志
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0640
# 监控
monitoring.enabled: true
monitoring.cluster_uuid: "nginx-logs"
3.2 实际应用案例
案例一:实时流量监控仪表盘
Kibana Dashboard 配置(通过 DevTools 导入):
POST _dashboards/dashboard/nginx-overview
{
"title": "Nginx 访问概览",
"panels": [
{
"type": "metric",
"title": "总请求数",
"query": {
"index": "nginx-access-*",
"agg": "count"
}
},
{
"type": "metric",
"title": "平均响应时间",
"query": {
"index": "nginx-access-*",
"agg": "avg",
"field": "nginx.request_time"
}
},
{
"type": "pie",
"title": "状态码分布",
"query": {
"index": "nginx-access-*",
"agg": "terms",
"field": "nginx.status"
}
},
{
"type": "line",
"title": "请求量趋势",
"query": {
"index": "nginx-access-*",
"agg": "date_histogram",
"interval": "1m"
}
}
]
}
常用 Kibana 查询
# 查找 500 错误
nginx.status: 500
# 查找慢请求(超过 2 秒)
nginx.request_time: >2
# 查找特定 IP 的访问
nginx.remote_addr: "192.168.1.100"
# 查找特定 URI 的访问
nginx.request_uri: "/api/users*"
# 组合查询:生产环境的 500 错误
env: production AND nginx.status: 500
# 时间范围+条件
nginx.status: >=400 AND @timestamp: [now-1h TO now]
案例二:异常访问告警配置
# Elasticsearch Watcher 告警规则
PUT _watcher/watch/nginx_5xx_alert
{
"trigger": {
"schedule": {
"interval": "5m"
}
},
"input": {
"search": {
"request": {
"indices": ["nginx-access-*"],
"body": {
"query": {
"bool": {
"must": [
{"range": {"@timestamp": {"gte": "now-5m"}}},
{"range": {"nginx.status": {"gte": 500}}}
]
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total.value": {
"gt": 10
}
}
},
"actions": {
"send_email": {
"email": {
"to": "ops@example.com",
"subject": "Nginx 5xx 错误告警",
"body": {
"text": "过去 5 分钟内发生了 {{ctx.payload.hits.total.value}} 次 5xx 错误,请及时处理!"
}
}
},
"webhook": {
"webhook": {
"scheme": "https",
"host": "hooks.slack.com",
"port": 443,
"method": "post",
"path": "/services/xxx/yyy/zzz",
"body": "{\"text\": \"Nginx 告警: 5 分钟内 {{ctx.payload.hits.total.value}} 次 5xx 错误\"}"
}
}
}
}
案例三:自动化日志清理脚本
#!/bin/bash
# cleanup_logs.sh - 自动化日志清理和归档
# 配置
LOG_DIR="/var/log/nginx"
ARCHIVE_DIR="/data/archive/nginx"
RETENTION_DAYS=90
ARCHIVE_DAYS=30
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
log "开始执行日志清理..."
# 创建归档目录
mkdir -p "$ARCHIVE_DIR"
# 压缩超过 1 天的未压缩日志
log "压缩未压缩的旧日志..."
find "$LOG_DIR" -name "*.log-*" ! -name "*.gz" -mtime +1 -exec gzip {} \;
# 归档超过 30 天的日志到归档目录
log "归档超过 ${ARCHIVE_DAYS} 天的日志..."
find "$LOG_DIR" -name "*.log-*.gz" -mtime +$ARCHIVE_DAYS -exec mv {} "$ARCHIVE_DIR/" \;
# 删除超过 90 天的归档日志
log "删除超过 ${RETENTION_DAYS} 天的归档日志..."
find "$ARCHIVE_DIR" -name "*.gz" -mtime +$RETENTION_DAYS -delete
# 清理空目录
find "$LOG_DIR" -type d -empty -delete 2>/dev/null
find "$ARCHIVE_DIR" -type d -empty -delete 2>/dev/null
# 统计
log "清理完成!"
log "当前日志目录大小: $(du -sh $LOG_DIR | cut -f1)"
log "归档目录大小: $(du -sh $ARCHIVE_DIR | cut -f1)"
# 检查磁盘空间
DISK_USAGE=$(df -h "$LOG_DIR" | awk 'NR==2 {print $5}' | tr -d '%')
if [ "$DISK_USAGE" -gt 80 ]; then
log "警告: 磁盘使用率超过 80%!当前: ${DISK_USAGE}%"
fi
# 添加定时任务
crontab -e
# 每天凌晨 3 点执行
0 3 * * * /usr/local/bin/cleanup_logs.sh >> /var/log/cleanup_logs.log 2>&1
案例四:实时日志分析脚本
#!/bin/bash
# realtime_analysis.sh - Nginx 日志实时分析
LOG_FILE=${1:-/var/log/nginx/access.log}
INTERVAL=${2:-60}
echo "=== Nginx 实时日志分析 ==="
echo "日志文件: $LOG_FILE"
echo "统计间隔: ${INTERVAL} 秒"
echo ""
while true; do
clear
echo "====== $(date '+%Y-%m-%d %H:%M:%S') ======"
# 最近 N 秒的日志
START_TIME=$(date -d "-${INTERVAL} seconds" '+%d/%b/%Y:%H:%M:%S')
echo ""
echo "--- 请求量统计 ---"
# 如果是 JSON 格式日志
if head -1 "$LOG_FILE" | grep -q "^{"; then
# JSON 格式处理
tail -10000 "$LOG_FILE" | jq -r '.status' 2>/dev/null | sort | uniq -c | sort -rn | head -10
else
# 传统格式处理
awk -v start="$START_TIME" '
$4 >= "["start {print $9}
' "$LOG_FILE" | sort | uniq -c | sort -rn | head -10
fi
echo ""
echo "--- Top 10 慢请求 ---"
if head -1 "$LOG_FILE" | grep -q "^{"; then
tail -10000 "$LOG_FILE" | jq -r 'select(.request_time > 1) | "\(.request_time)s \(.request_uri)"' 2>/dev/null | sort -rn | head -10
else
tail -10000 "$LOG_FILE" | awk -F'rt=' '{print $2}' | sort -rn | head -10
fi
echo ""
echo "--- Top 10 访问 IP ---"
if head -1 "$LOG_FILE" | grep -q "^{"; then
tail -10000 "$LOG_FILE" | jq -r '.remote_addr' 2>/dev/null | sort | uniq -c | sort -rn | head -10
else
tail -10000 "$LOG_FILE" | awk '{print $1}' | sort | uniq -c | sort -rn | head -10
fi
echo ""
echo "--- 实时 QPS ---"
CURRENT_TIME=$(date +%s)
QPS=$(awk -v ct="$CURRENT_TIME" -v interval="$INTERVAL" '
BEGIN { count=0 }
{
# 简化计算,统计最近的行数
count++
}
END {
if (count > 10000) count = 10000
printf "%.1f", count / interval
}
' <<< "$(tail -10000 "$LOG_FILE")")
echo "当前 QPS: $QPS"
echo ""
echo "按 Ctrl+C 退出..."
sleep $INTERVAL
done
四、最佳实践和注意事项
4.1 最佳实践
1. 日志格式标准化
# 推荐使用 JSON 格式,便于解析
# 字段命名统一,便于跨项目复用
# 好的实践:统一的字段命名
log_format json_combined escape=json
'{'
'"@timestamp":"$time_iso8601",' # 统一使用 @timestamp
'"client_ip":"$remote_addr",' # 清晰的字段名
'"method":"$request_method",'
'"uri":"$request_uri",'
'"status":$status,'
'"latency_ms":$request_time' # 明确单位
'}';
2. 日志分级存储
# 热数据:最近 7 天,SSD 存储,Elasticsearch
# 温数据:7-30 天,HDD 存储,Elasticsearch
# 冷数据:30 天以上,对象存储/归档
# 删除:90 天以上
# ILM 策略配置
PUT _ilm/policy/nginx-logs-policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
}
},
"cold": {
"min_age": "30d",
"actions": {
"allocate": { "require": { "data": "cold" } }
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
3. 采样日志记录
# 高流量场景下的日志采样
# 只记录 1% 的正常请求,但记录所有错误
# 方案一:使用 split_clients
split_clients "${remote_addr}${request_uri}" $log_sample {
1% 1;
* 0;
}
# 正常请求采样记录
access_log /var/log/nginx/access.log json_combined if=$log_sample;
# 错误请求全量记录
map $status $log_error {
~^[45] 1;
default 0;
}
access_log /var/log/nginx/error_requests.log json_combined if=$log_error;
4. 敏感信息脱敏
# 不要记录敏感参数
# 使用 map 过滤敏感信息
map $request_uri $clean_uri {
~^(?<path>[^?]+)\?.*password=.*$ "$path?password=***";
~^(?<path>[^?]+)\?.*token=.*$ "$path?token=***";
default $request_uri;
}
log_format safe_json escape=json
'{'
'"uri":"$clean_uri"' # 使用脱敏后的 URI
'}';
4.2 注意事项
常见错误表
| 错误 |
原因 |
解决方案 |
| 日志不切割 |
logrotate 配置错误或权限问题 |
检查配置语法和权限 |
| 切割后日志为空 |
没有发送 USR1 信号 |
确保 postrotate 正确执行 |
| Filebeat CPU 高 |
日志量过大或正则复杂 |
优化配置、增加资源 |
| ES 索引膨胀 |
没有配置 ILM |
配置生命周期管理 |
| Logstash 解析失败 |
Grok 模式不匹配 |
使用 JSON 格式避免解析 |
| 日志延迟大 |
网络或处理瓶颈 |
增加 buffer、worker |
性能优化建议
# 1. Filebeat 优化
# 增加批量大小
bulk_max_size: 4096
# 增加 worker 数
output.logstash:
worker: 4
# 2. Logstash 优化
# 增加 pipeline worker
pipeline.workers: 4
pipeline.batch.size: 500
# 3. Elasticsearch 优化
# 调整刷新间隔
PUT nginx-access-*/_settings
{
"index.refresh_interval": "30s"
}
# 调整副本数(写入时减少)
PUT nginx-access-*/_settings
{
"index.number_of_replicas": 0
}
磁盘空间管理
# 预估日志大小
# 假设:每条日志 500 字节,日均 1000 万请求
# 原始日志:500 * 10,000,000 = 5GB/天
# Gzip 压缩后:约 500MB/天
# ES 存储(含索引):约 2GB/天
# 保留 90 天需要:
# 压缩日志:45GB
# ES 存储:180GB
# 建议预留:300GB
# 监控磁盘使用
df -h /var/log
df -h /var/lib/elasticsearch
五、故障排查和监控
5.1 故障排查
问题一:日志不写入
# 排查步骤
# 1. 检查 Nginx 配置
nginx -T | grep access_log
# 2. 检查日志目录权限
ls -la /var/log/nginx/
namei -l /var/log/nginx/access.log
# 3. 检查磁盘空间
df -h /var/log
# 4. 检查 SELinux
getenforce
ausearch -m avc -ts recent
# 5. 检查 Nginx 错误日志
tail -100 /var/log/nginx/error.log
# 6. 测试写入
sudo -u nginx touch /var/log/nginx/test
问题二:Filebeat 无法采集
# 1. 检查 Filebeat 状态
systemctl status filebeat
journalctl -u filebeat -f
# 2. 检查配置
filebeat test config
filebeat test output
# 3. 检查日志文件权限
ls -la /var/log/nginx/
# Filebeat 用户需要读取权限
# 4. 检查 registry
cat /var/lib/filebeat/registry/filebeat/log.json | jq .
# 5. 手动测试
filebeat -e -c /etc/filebeat/filebeat.yml
问题三:Logstash 解析失败
# 1. 启用调试输出
# 在 output 中添加:
output {
stdout { codec => rubydebug }
}
# 2. 测试 Grok 模式
# 使用 Kibana 的 Grok Debugger
# 或在线工具:https://grokdebugger.com/
# 3. 查看 Logstash 日志
tail -f /var/log/logstash/logstash-plain.log
# 4. 检查 tag
# 解析失败的日志会有 _grokparsefailure 标签
GET nginx-access-*/_search
{
"query": {
"term": { "tags": "_grokparsefailure" }
}
}
问题四:Elasticsearch 索引异常
# 1. 检查集群健康
curl -s localhost:9200/_cluster/health?pretty
# 2. 检查索引状态
curl -s localhost:9200/_cat/indices/nginx-*?v
# 3. 检查分片分配
curl -s localhost:9200/_cat/shards/nginx-*?v
# 4. 查看未分配分片原因
curl -s localhost:9200/_cluster/allocation/explain?pretty
# 5. 检查磁盘空间
curl -s localhost:9200/_cat/allocation?v
5.2 性能监控
Filebeat 监控
# 查看 Filebeat 指标
curl -s localhost:5066/stats | jq .
# 关键指标
# - filebeat.events.active: 正在处理的事件数
# - filebeat.events.done: 已完成的事件数
# - libbeat.output.events.acked: 已确认的事件数
# - libbeat.output.events.dropped: 丢弃的事件数
Logstash 监控
# 查看 Logstash 指标
curl -s localhost:9600/_node/stats?pretty
# 关键指标
# - jvm.mem.heap_used_percent: JVM 堆使用率
# - process.cpu.percent: CPU 使用率
# - pipeline.events.in: 输入事件数
# - pipeline.events.out: 输出事件数
# - pipeline.events.filtered: 过滤事件数
Elasticsearch 监控
#!/bin/bash
# monitor_elk.sh
ES_HOST="localhost:9200"
echo "=== Elasticsearch 集群监控 ==="
echo ""
echo "--- 集群健康 ---"
curl -s "$ES_HOST/_cluster/health?pretty"
echo ""
echo "--- 节点状态 ---"
curl -s "$ES_HOST/_cat/nodes?v"
echo ""
echo "--- 索引大小 ---"
curl -s "$ES_HOST/_cat/indices/nginx-*?v&s=store.size:desc" | head -20
echo ""
echo "--- 分片分布 ---"
curl -s "$ES_HOST/_cat/shards/nginx-*?v" | head -20
echo ""
echo "--- JVM 内存 ---"
curl -s "$ES_HOST/_nodes/stats/jvm?pretty" | grep -A5 "heap_used"
5.3 备份与恢复
Elasticsearch 快照备份
# 1. 注册快照仓库
PUT _snapshot/nginx_backup
{
"type": "fs",
"settings": {
"location": "/backup/elasticsearch",
"compress": true
}
}
# 2. 创建快照
PUT _snapshot/nginx_backup/snapshot_20250107?wait_for_completion=true
{
"indices": "nginx-access-*",
"ignore_unavailable": true,
"include_global_state": false
}
# 3. 查看快照
GET _snapshot/nginx_backup/_all
# 4. 恢复快照
POST _snapshot/nginx_backup/snapshot_20250107/_restore
{
"indices": "nginx-access-2025.01.06",
"rename_pattern": "nginx-(.+)",
"rename_replacement": "restored_nginx-$1"
}
自动化备份脚本
#!/bin/bash
# backup_elk.sh - ELK 备份脚本
ES_HOST="localhost:9200"
BACKUP_REPO="nginx_backup"
DATE=$(date +%Y%m%d)
RETENTION_DAYS=30
# 创建快照
echo "创建快照: snapshot_$DATE"
curl -X PUT "$ES_HOST/_snapshot/$BACKUP_REPO/snapshot_$DATE?wait_for_completion=true" \
-H 'Content-Type: application/json' \
-d '{
"indices": "nginx-access-*,nginx-error-*",
"ignore_unavailable": true,
"include_global_state": false
}'
# 删除旧快照
echo "清理 ${RETENTION_DAYS} 天前的快照..."
OLD_DATE=$(date -d "-${RETENTION_DAYS} days" +%Y%m%d)
curl -s "$ES_HOST/_snapshot/$BACKUP_REPO/_all" | \
jq -r ".snapshots[].snapshot" | \
while read snapshot; do
SNAP_DATE=$(echo $snapshot | grep -oP '\d{8}')
if [[ "$SNAP_DATE" < "$OLD_DATE" ]]; then
echo "删除快照: $snapshot"
curl -X DELETE "$ES_HOST/_snapshot/$BACKUP_REPO/$snapshot"
fi
done
echo "备份完成!"
六、总结
6.1 技术要点回顾
日志管理的核心就是四个字:采、存、析、用。
- 采集:使用 JSON 格式日志,Filebeat 轻量采集
- 存储:logrotate 本地切割,ES 分布式存储
- 分析:Logstash 解析转换,Kibana 可视化
- 应用:实时监控告警,问题快速定位
关键配置清单:
- Nginx 使用 JSON 格式日志(便于解析)
- logrotate 配置(daily + compress + 90 天保留)
- Filebeat 采集配置(JSON 解析 + 多路径)
- Logstash 过滤配置(时间解析 + 类型转换 + GeoIP)
- ES 索引模板(字段类型 + ILM 策略)
6.2 进阶学习方向
- 日志安全:日志脱敏、访问控制、审计合规
- 智能分析:机器学习异常检测、根因分析
- 分布式追踪:结合 Jaeger/Zipkin 实现全链路追踪
- 云原生日志:Loki、Fluentd、云厂商日志服务
6.3 参考资料
欢迎到 云栈社区 交流更多关于 ELK Stack 和 Nginx 的技术话题。
附录
A. 命令速查表
| 命令 |
说明 |
nginx -s reopen |
重新打开日志文件 |
kill -USR1 $(cat /run/nginx.pid) |
发送 USR1 信号 |
logrotate -f /etc/logrotate.d/nginx |
强制执行日志切割 |
logrotate -d /etc/logrotate.d/nginx |
测试配置(不执行) |
filebeat test config |
测试 Filebeat 配置 |
filebeat test output |
测试 Filebeat 输出 |
curl localhost:9200/_cat/indices |
查看 ES 索引 |
curl localhost:9600/_node/stats |
查看 Logstash 状态 |
B. 配置参数详解
| 参数 |
模块 |
说明 |
| log_format |
Nginx |
定义日志格式 |
| access_log |
Nginx |
指定访问日志路径和格式 |
| open_log_file_cache |
Nginx |
日志文件描述符缓存 |
| daily/weekly/monthly |
logrotate |
切割周期 |
| rotate N |
logrotate |
保留份数 |
| compress |
logrotate |
压缩旧日志 |
| sharedscripts |
logrotate |
脚本只执行一次 |
| bulk_max_size |
Filebeat |
批量发送大小 |
| pipeline.workers |
Logstash |
处理线程数 |
C. 术语表
| 术语 |
解释 |
| ELK Stack |
Elasticsearch + Logstash + Kibana 的日志解决方案 |
| Filebeat |
轻量级日志采集器 |
| Grok |
Logstash 的正则解析插件 |
| ILM |
Index Lifecycle Management,索引生命周期管理 |
| Shard |
Elasticsearch 的数据分片 |
| Replica |
Elasticsearch 的副本分片 |
| USR1 |
Nginx 信号,用于重新打开日志文件 |
| logrotate |
Linux 日志切割工具 |