随着业务的不断增长,日常运维中的Nginx配置文件往往会变得愈发复杂。手动梳理散落在各个文件中的 server 块、location 路由规则以及 upstream 负载均衡配置不仅耗时费力,还极易出现疏漏和错误。本文将介绍一款使用 Python 编写的实用脚本,它能够自动解析 Nginx 配置文件,并将其丰富的配置信息结构化为清晰的 Excel 表格,极大地方便了配置审计、团队协作和安全管理工作。
脚本核心功能
配置解析能力
- upstream块解析:自动识别负载均衡配置,提取后端服务器地址及负载均衡策略。
- server块解析:提取监听端口、域名(server_name)、SSL状态、日志路径等关键信息。
- location块解析:详细解析每个路由规则,包括代理目标(
proxy_pass)、静态文件路径(root/alias)、访问控制、重写规则等。
数据结构化输出
脚本会将所有配置信息整理成包含以下字段的结构化表格(Excel):
| 字段类别 |
字段名 |
说明 |
示例 |
| 基础信息 |
配置文件路径 |
原始配置文件来源 |
/etc/nginx/nginx.conf |
| Server块信息 |
服务器块编号 |
唯一标识每个server块 |
server_1 |
|
域名/服务 |
server_name 配置 |
example.com |
|
监听端口 |
端口号及SSL标识 |
443 ssl |
|
SSL状态 |
是否启用SSL加密 |
启用 |
| Location块增强 |
路由-location |
location 匹配路径 |
/api/ |
|
匹配模式 |
location匹配规则类型 |
前缀匹配、正则匹配等 |
|
目标/路径 |
代理目标或文件根目录 |
http://backend/ 或 /var/www/html |
|
负载均衡 |
upstream服务器地址列表 |
192.168.1.10:8080, 192.168.1.11:8080 |
|
负载均衡策略 |
upstream负载均衡算法 |
ip_hash, least_conn |
|
HTTP方法限制 |
location允许的HTTP方法 |
GET, POST |
|
访问控制 |
IP白名单/黑名单规则 |
allow 192.168.1.0/24; deny all; |
|
重写规则 |
rewrite规则配置 |
rewrite ^/old /new permanent; |
|
超时设置 |
代理相关超时时间 |
proxy_connect_timeout 60s; |
|
客户端最大Body大小 |
请求体大小限制 |
client_max_body_size 10m; |
| SSL/TLS配置 |
SSL证书 |
SSL证书文件路径 |
/path/to/cert.crt |
|
SSL密钥 |
SSL私钥文件路径 |
/path/to/cert.key |
| 日志与错误页面 |
访问日志 |
access_log 路径 |
/var/log/nginx/access.log |
|
错误日志 |
error_log 路径 |
/var/log/nginx/error.log |
|
错误页面 |
error_page 配置 |
404 -> /404.html |
完整增强版代码实现
import re
import pandas as pd
import sys
from collections import OrderedDict
def parse_nginx_config(config_content, file_path=''):
"""
解析Nginx配置内容,提取upstream和server信息。
:param config_content: Nginx配置文件的内容字符串
:param file_path: 配置文件路径
:return: 包含服务器信息的列表
"""
servers = []
upstreams = {}
# 提取所有 upstream 块及其负载均衡策略
upstream_blocks = re.findall(r'upstream\s+(\w+)\s*\{\s*([^}]*?)\s*\}', config_content, re.DOTALL)
for upstream_name, upstream_body in upstream_blocks:
upstream_servers = re.findall(r'server\s+([^\s;]+);', upstream_body)
# 提取负载均衡策略
strategy_match = re.search(r'(ip_hash|least_conn|fair|hash\s+\w+)', upstream_body)
strategy = strategy_match.group(1) if strategy_match else '轮询'
upstreams[upstream_name] = {
'servers': ', '.join(upstream_servers),
'strategy': strategy
}
# 拆分所有 server 块
server_blocks = re.split(r'\n\s*server\s*\{', config_content)
for i, block in enumerate(server_blocks):
if not block.strip():
continue
server_info = OrderedDict()
server_info['配置文件路径'] = file_path
server_info['服务器块编号'] = f'server_{i}'
# 提取监听端口和SSL状态
listen_matches = re.findall(r'listen\s+(\d+)(?:\s+ssl)?\s*;', block)
ssl_listen = re.search(r'listen\s+\d+\s+ssl;', block)
server_info['SSL状态'] = '启用' if ssl_listen else '未启用'
server_info['监听端口'] = ', '.join([match[0] if isinstance(match, tuple) else match for match in listen_matches]) if listen_matches else '80'
# 提取SSL证书信息
ssl_cert_match = re.search(r'ssl_certificate\s+([^\s;]+);', block)
ssl_key_match = re.search(r'ssl_certificate_key\s+([^\s;]+);', block)
server_info['SSL证书'] = ssl_cert_match.group(1) if ssl_cert_match else '无'
server_info['SSL密钥'] = ssl_key_match.group(1) if ssl_key_match else '无'
# 提取 server_name
server_name_match = re.search(r'server_name\s+([^\s;]+(?:\s+[^\s;]+)*);', block)
if server_name_match:
server_info['域名/服务'] = ', '.join(server_name_match.group(1).split())
else:
server_info['域名/服务'] = "默认服务器"
# 提取 error_page 配置
error_pages = re.findall(r'error_page\s+(\d+)\s+([^\s;]+);', block)
server_info['错误页面'] = '; '.join([f'{code}->{page}' for code, page in error_pages]) if error_pages else '默认'
# 提取 access_log 和 error_log
access_log_match = re.search(r'access_log\s+([^\s;]+);', block)
error_log_match = re.search(r'error_log\s+([^\s;]+);', block)
server_info['访问日志'] = access_log_match.group(1) if access_log_match else '默认'
server_info['错误日志'] = error_log_match.group(1) if error_log_match else '默认'
# 提取 location 块
locations = []
location_blocks = re.findall(r'location\s+([^\s{]+)\s*\{\s*([^}]*?)\s*\}', block, re.DOTALL)
for loc_path, loc_content in location_blocks:
location_info = OrderedDict()
location_info['路由-location'] = loc_path
# 分析location匹配模式
if loc_path.startswith('='):
location_info['匹配模式'] = '精确匹配'
elif loc_path.startswith('~*'):
location_info['匹配模式'] = '正则匹配(忽略大小写)'
elif loc_path.startswith('~'):
location_info['匹配模式'] = '正则匹配'
elif loc_path.startswith('^~'):
location_info['匹配模式'] = '前缀优先匹配'
else:
location_info['匹配模式'] = '前缀匹配'
# 提取HTTP方法限制
limit_except_match = re.search(r'limit_except\s+(\w+)\s*\{', loc_content)
location_info['HTTP方法限制'] = limit_except_match.group(1) if limit_except_match else '无限制'
# 检查 proxy_pass
proxy_pass_match = re.search(r'proxy_pass\s+(http[s]?://[^\s;/]+)[^\s;]*;', loc_content)
if proxy_pass_match:
proxy_target = proxy_pass_match.group(1)
location_info['目标/路径'] = proxy_target
# 检查是否为 upstream
upstream_found = False
for upstream_name, upstream_data in upstreams.items():
if upstream_name in proxy_target:
location_info['负载均衡'] = upstream_data['servers']
location_info['负载均衡策略'] = upstream_data['strategy']
upstream_found = True
break
if not upstream_found:
location_info['负载均衡'] = "无"
location_info['负载均衡策略'] = "无"
# 提取代理超时设置
timeouts = {
'proxy_connect_timeout': re.search(r'proxy_connect_timeout\s+([^\s;]+);', loc_content),
'proxy_send_timeout': re.search(r'proxy_send_timeout\s+([^\s;]+);', loc_content),
'proxy_read_timeout': re.search(r'proxy_read_timeout\s+([^\s;]+);', loc_content)
}
timeout_info = []
for key, match in timeouts.items():
if match:
timeout_info.append(f'{key}: {match.group(1)}')
location_info['超时设置'] = '\n'.join(timeout_info) if timeout_info else '默认'
else:
# 检查静态文件路径
root_match = re.search(r'root\s+([^\s;]+);', loc_content)
if root_match:
location_info['目标/路径'] = root_match.group(1)
location_info['负载均衡'] = "无"
location_info['负载均衡策略'] = "无"
else:
alias_match = re.search(r'alias\s+([^\s;]+);', loc_content)
if alias_match:
location_info['目标/路径'] = alias_match.group(1)
location_info['负载均衡'] = "无"
location_info['负载均衡策略'] = "无"
else:
location_info['目标/路径'] = "其他配置"
location_info['负载均衡'] = "无"
location_info['负载均衡策略'] = "无"
# 提取访问控制规则
deny_rules = re.findall(r'deny\s+([^\s;]+);', loc_content)
allow_rules = re.findall(r'allow\s+([^\s;]+);', loc_content)
access_control = []
if allow_rules:
access_control.append(f'允许: {", ".join(allow_rules)}')
if deny_rules:
access_control.append(f'拒绝: {", ".join(deny_rules)}')
location_info['访问控制'] = '\n'.join(access_control) if access_control else '无限制'
# 提取重写规则
rewrite_rules = re.findall(r'rewrite\s+([^;]+);', loc_content)
location_info['重写规则'] = '\n'.join(rewrite_rules) if rewrite_rules else '无'
# 提取其他重要配置参数
location_info['客户端最大Body大小'] = re.search(r'client_max_body_size\s+([^\s;]+);', loc_content).group(1) if re.search(r'client_max_body_size\s+([^\s;]+);', loc_content) else '默认'
# 保留原始配置行作为备注,但过滤掉已解析的字段
basic_lines = []
for line in loc_content.strip().split('\n'):
line = line.strip()
if line and not any(keyword in line for keyword in
['proxy_pass', 'root', 'alias', 'location', 'deny', 'allow',
'rewrite', 'client_max_body_size', 'proxy_connect_timeout',
'proxy_send_timeout', 'proxy_read_timeout', 'limit_except']):
basic_lines.append(line)
location_info['其他参数'] = '\n'.join(basic_lines) if basic_lines else '无'
locations.append(location_info)
server_info['locations'] = locations
servers.append(server_info)
return servers
def generate_table(servers):
"""将解析结果转为 Pandas DataFrame"""
rows = []
for server in servers:
for loc in server.get('locations', []):
row = OrderedDict()
# Server级别字段
row['配置文件路径'] = server.get('配置文件路径', '')
row['服务器块编号'] = server.get('服务器块编号', '')
row['域名/服务'] = server.get('域名/服务', '')
row['监听端口'] = server.get('监听端口', '')
row['SSL状态'] = server.get('SSL状态', '')
row['SSL证书'] = server.get('SSL证书', '')
row['SSL密钥'] = server.get('SSL密钥', '')
row['错误页面'] = server.get('错误页面', '')
row['访问日志'] = server.get('访问日志', '')
row['错误日志'] = server.get('错误日志', '')
# Location级别字段
row['路由-location'] = loc.get('路由-location', '')
row['匹配模式'] = loc.get('匹配模式', '')
row['目标/路径'] = loc.get('目标/路径', '')
row['负载均衡'] = loc.get('负载均衡', '')
row['负载均衡策略'] = loc.get('负载均衡策略', '')
row['HTTP方法限制'] = loc.get('HTTP方法限制', '')
row['超时设置'] = loc.get('超时设置', '')
row['访问控制'] = loc.get('访问控制', '')
row['重写规则'] = loc.get('重写规则', '')
row['客户端最大Body大小'] = loc.get('客户端最大Body大小', '')
row['其他参数'] = loc.get('其他参数', '')
rows.append(row)
return pd.DataFrame(rows)
def main():
# 检查依赖
try:
import pandas as pd
except ImportError:
print("错误: 未找到pandas库,请先安装。可以使用命令:pip install pandas openpyxl")
return
# 检查参数
if len(sys.argv) != 2:
print("用法: python3 parse_nginx.py <nginx.conf>")
print("示例: python3 parse_nginx.py /etc/nginx/nginx.conf")
return
file_path = sys.argv[1]
try:
with open(file_path, 'r', encoding='utf-8') as f:
config_content = f.read()
except FileNotFoundError:
print(f"错误: 文件 {file_path} 未找到,请检查路径是否正确。")
return
except Exception as e:
print(f"读取文件发生错误: {e}")
return
# 解析并生成表格
print("正在解析Nginx配置文件...")
servers = parse_nginx_config(config_content, file_path)
table = generate_table(servers)
# 打印预览
print("\n解析完成!前5行数据预览:")
print(table.head())
print(f"\n总共解析出 {len(table)} 个location配置")
# 保存为 Excel
excel_file_path = 'nginx_config_table.xlsx'
try:
table.to_excel(excel_file_path, index=False, engine='openpyxl')
print(f"\nNginx配置解析结果已保存至 {excel_file_path}")
print(f"Excel文件包含以下字段:")
for i, col in enumerate(table.columns, 1):
print(f"{i}. {col}")
except Exception as e:
print(f"保存Excel文件时发生错误: {e}")
print("请确保已安装openpyxl: pip install openpyxl")
if __name__ == '__main__':
main()
功能说明与使用场景
- 结构化数据存储:使用
OrderedDict 保持字段顺序,使生成的Excel表格结构清晰、易于查阅与分析。
- SSL/TLS配置解析:新增SSL状态检测与证书路径提取功能,极大地便利了安全审计和证书生命周期管理。
- 负载均衡策略识别:不仅能提取
upstream 中的服务器地址列表,还能自动识别其使用的负载均衡算法(如轮询、ip_hash、least_conn等),这对于数据库/中间件的性能调优与架构理解至关重要。
- 访问控制规则提取:自动识别
location 块中的 allow/deny 规则,便于快速进行安全策略审查与合规性检查。
- 超时设置与性能参数:提取代理连接、发送、读取的超时设置以及客户端请求体大小限制,为运维/DevOps团队的性能瓶颈分析与优化提供数据支持。
使用方式
# 1. 安装必要的Python依赖库
pip install pandas openpyxl
# 2. 运行脚本,指定需要解析的Nginx配置文件路径
python3 parse_nginx.py /etc/nginx/nginx.conf
运行成功后,脚本会在当前目录下生成名为 nginx_config_table.xlsx 的Excel文件。该表格涵盖了从基础监听信息到详细的 location 路由规则在内的全面配置信息。
典型应用场景
- 安全审计:集中审查SSL配置状态、证书路径以及IP访问控制规则。
- 性能分析:统一查看各服务的超时设置、负载均衡策略与请求大小限制。
- 运维管理:快速归档和对比不同环境的日志路径、错误页面配置。
- 开发与协作:为开发人员提供清晰的路由规则和重写配置参考,便于前后端联调。