找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1007

积分

0

好友

145

主题
发表于 3 天前 | 查看: 11| 回复: 0

随着业务的不断增长,日常运维中的Nginx配置文件往往会变得愈发复杂。手动梳理散落在各个文件中的 server 块、location 路由规则以及 upstream 负载均衡配置不仅耗时费力,还极易出现疏漏和错误。本文将介绍一款使用 Python 编写的实用脚本,它能够自动解析 Nginx 配置文件,并将其丰富的配置信息结构化为清晰的 Excel 表格,极大地方便了配置审计、团队协作和安全管理工作。

脚本核心功能

配置解析能力

  • upstream块解析:自动识别负载均衡配置,提取后端服务器地址及负载均衡策略。
  • server块解析:提取监听端口、域名(server_name)、SSL状态、日志路径等关键信息。
  • location块解析:详细解析每个路由规则,包括代理目标(proxy_pass)、静态文件路径(root/alias)、访问控制、重写规则等。

数据结构化输出

脚本会将所有配置信息整理成包含以下字段的结构化表格(Excel):

字段类别 字段名 说明 示例
基础信息 配置文件路径 原始配置文件来源 /etc/nginx/nginx.conf
Server块信息 服务器块编号 唯一标识每个server块 server_1
域名/服务 server_name 配置 example.com
监听端口 端口号及SSL标识 443 ssl
SSL状态 是否启用SSL加密 启用
Location块增强 路由-location location 匹配路径 /api/
匹配模式 location匹配规则类型 前缀匹配、正则匹配等
目标/路径 代理目标或文件根目录 http://backend/ 或 /var/www/html
负载均衡 upstream服务器地址列表 192.168.1.10:8080, 192.168.1.11:8080
负载均衡策略 upstream负载均衡算法 ip_hash, least_conn
HTTP方法限制 location允许的HTTP方法 GET, POST
访问控制 IP白名单/黑名单规则 allow 192.168.1.0/24; deny all;
重写规则 rewrite规则配置 rewrite ^/old /new permanent;
超时设置 代理相关超时时间 proxy_connect_timeout 60s;
客户端最大Body大小 请求体大小限制 client_max_body_size 10m;
SSL/TLS配置 SSL证书 SSL证书文件路径 /path/to/cert.crt
SSL密钥 SSL私钥文件路径 /path/to/cert.key
日志与错误页面 访问日志 access_log 路径 /var/log/nginx/access.log
错误日志 error_log 路径 /var/log/nginx/error.log
错误页面 error_page 配置 404 -> /404.html

完整增强版代码实现

import re
import pandas as pd
import sys
from collections import OrderedDict

def parse_nginx_config(config_content, file_path=''):
    """
    解析Nginx配置内容,提取upstream和server信息。
    :param config_content: Nginx配置文件的内容字符串
    :param file_path: 配置文件路径
    :return: 包含服务器信息的列表
    """
    servers = []
    upstreams = {}

    # 提取所有 upstream 块及其负载均衡策略
    upstream_blocks = re.findall(r'upstream\s+(\w+)\s*\{\s*([^}]*?)\s*\}', config_content, re.DOTALL)
    for upstream_name, upstream_body in upstream_blocks:
        upstream_servers = re.findall(r'server\s+([^\s;]+);', upstream_body)

        # 提取负载均衡策略
        strategy_match = re.search(r'(ip_hash|least_conn|fair|hash\s+\w+)', upstream_body)
        strategy = strategy_match.group(1) if strategy_match else '轮询'

        upstreams[upstream_name] = {
            'servers': ', '.join(upstream_servers),
            'strategy': strategy
        }

    # 拆分所有 server 块
    server_blocks = re.split(r'\n\s*server\s*\{', config_content)

    for i, block in enumerate(server_blocks):
        if not block.strip():
            continue

        server_info = OrderedDict()
        server_info['配置文件路径'] = file_path
        server_info['服务器块编号'] = f'server_{i}'

        # 提取监听端口和SSL状态
        listen_matches = re.findall(r'listen\s+(\d+)(?:\s+ssl)?\s*;', block)
        ssl_listen = re.search(r'listen\s+\d+\s+ssl;', block)
        server_info['SSL状态'] = '启用' if ssl_listen else '未启用'
        server_info['监听端口'] = ', '.join([match[0] if isinstance(match, tuple) else match for match in listen_matches]) if listen_matches else '80'

        # 提取SSL证书信息
        ssl_cert_match = re.search(r'ssl_certificate\s+([^\s;]+);', block)
        ssl_key_match = re.search(r'ssl_certificate_key\s+([^\s;]+);', block)
        server_info['SSL证书'] = ssl_cert_match.group(1) if ssl_cert_match else '无'
        server_info['SSL密钥'] = ssl_key_match.group(1) if ssl_key_match else '无'

        # 提取 server_name
        server_name_match = re.search(r'server_name\s+([^\s;]+(?:\s+[^\s;]+)*);', block)
        if server_name_match:
            server_info['域名/服务'] = ', '.join(server_name_match.group(1).split())
        else:
            server_info['域名/服务'] = "默认服务器"

        # 提取 error_page 配置
        error_pages = re.findall(r'error_page\s+(\d+)\s+([^\s;]+);', block)
        server_info['错误页面'] = '; '.join([f'{code}->{page}' for code, page in error_pages]) if error_pages else '默认'

        # 提取 access_log 和 error_log
        access_log_match = re.search(r'access_log\s+([^\s;]+);', block)
        error_log_match = re.search(r'error_log\s+([^\s;]+);', block)
        server_info['访问日志'] = access_log_match.group(1) if access_log_match else '默认'
        server_info['错误日志'] = error_log_match.group(1) if error_log_match else '默认'

        # 提取 location 块
        locations = []
        location_blocks = re.findall(r'location\s+([^\s{]+)\s*\{\s*([^}]*?)\s*\}', block, re.DOTALL)

        for loc_path, loc_content in location_blocks:
            location_info = OrderedDict()
            location_info['路由-location'] = loc_path

            # 分析location匹配模式
            if loc_path.startswith('='):
                location_info['匹配模式'] = '精确匹配'
            elif loc_path.startswith('~*'):
                location_info['匹配模式'] = '正则匹配(忽略大小写)'
            elif loc_path.startswith('~'):
                location_info['匹配模式'] = '正则匹配'
            elif loc_path.startswith('^~'):
                location_info['匹配模式'] = '前缀优先匹配'
            else:
                location_info['匹配模式'] = '前缀匹配'

            # 提取HTTP方法限制
            limit_except_match = re.search(r'limit_except\s+(\w+)\s*\{', loc_content)
            location_info['HTTP方法限制'] = limit_except_match.group(1) if limit_except_match else '无限制'

            # 检查 proxy_pass
            proxy_pass_match = re.search(r'proxy_pass\s+(http[s]?://[^\s;/]+)[^\s;]*;', loc_content)
            if proxy_pass_match:
                proxy_target = proxy_pass_match.group(1)
                location_info['目标/路径'] = proxy_target

                # 检查是否为 upstream
                upstream_found = False
                for upstream_name, upstream_data in upstreams.items():
                    if upstream_name in proxy_target:
                        location_info['负载均衡'] = upstream_data['servers']
                        location_info['负载均衡策略'] = upstream_data['strategy']
                        upstream_found = True
                        break

                if not upstream_found:
                    location_info['负载均衡'] = "无"
                    location_info['负载均衡策略'] = "无"

                # 提取代理超时设置
                timeouts = {
                    'proxy_connect_timeout': re.search(r'proxy_connect_timeout\s+([^\s;]+);', loc_content),
                    'proxy_send_timeout': re.search(r'proxy_send_timeout\s+([^\s;]+);', loc_content),
                    'proxy_read_timeout': re.search(r'proxy_read_timeout\s+([^\s;]+);', loc_content)
                }

                timeout_info = []
                for key, match in timeouts.items():
                    if match:
                        timeout_info.append(f'{key}: {match.group(1)}')

                location_info['超时设置'] = '\n'.join(timeout_info) if timeout_info else '默认'

            else:
                # 检查静态文件路径
                root_match = re.search(r'root\s+([^\s;]+);', loc_content)
                if root_match:
                    location_info['目标/路径'] = root_match.group(1)
                    location_info['负载均衡'] = "无"
                    location_info['负载均衡策略'] = "无"
                else:
                    alias_match = re.search(r'alias\s+([^\s;]+);', loc_content)
                    if alias_match:
                        location_info['目标/路径'] = alias_match.group(1)
                        location_info['负载均衡'] = "无"
                        location_info['负载均衡策略'] = "无"
                    else:
                        location_info['目标/路径'] = "其他配置"
                        location_info['负载均衡'] = "无"
                        location_info['负载均衡策略'] = "无"

            # 提取访问控制规则
            deny_rules = re.findall(r'deny\s+([^\s;]+);', loc_content)
            allow_rules = re.findall(r'allow\s+([^\s;]+);', loc_content)
            access_control = []
            if allow_rules:
                access_control.append(f'允许: {", ".join(allow_rules)}')
            if deny_rules:
                access_control.append(f'拒绝: {", ".join(deny_rules)}')
            location_info['访问控制'] = '\n'.join(access_control) if access_control else '无限制'

            # 提取重写规则
            rewrite_rules = re.findall(r'rewrite\s+([^;]+);', loc_content)
            location_info['重写规则'] = '\n'.join(rewrite_rules) if rewrite_rules else '无'

            # 提取其他重要配置参数
            location_info['客户端最大Body大小'] = re.search(r'client_max_body_size\s+([^\s;]+);', loc_content).group(1) if re.search(r'client_max_body_size\s+([^\s;]+);', loc_content) else '默认'

            # 保留原始配置行作为备注,但过滤掉已解析的字段
            basic_lines = []
            for line in loc_content.strip().split('\n'):
                line = line.strip()
                if line and not any(keyword in line for keyword in
                                   ['proxy_pass', 'root', 'alias', 'location', 'deny', 'allow',
                                    'rewrite', 'client_max_body_size', 'proxy_connect_timeout',
                                    'proxy_send_timeout', 'proxy_read_timeout', 'limit_except']):
                    basic_lines.append(line)

            location_info['其他参数'] = '\n'.join(basic_lines) if basic_lines else '无'

            locations.append(location_info)

        server_info['locations'] = locations
        servers.append(server_info)

    return servers

def generate_table(servers):
    """将解析结果转为 Pandas DataFrame"""
    rows = []
    for server in servers:
        for loc in server.get('locations', []):
            row = OrderedDict()

            # Server级别字段
            row['配置文件路径'] = server.get('配置文件路径', '')
            row['服务器块编号'] = server.get('服务器块编号', '')
            row['域名/服务'] = server.get('域名/服务', '')
            row['监听端口'] = server.get('监听端口', '')
            row['SSL状态'] = server.get('SSL状态', '')
            row['SSL证书'] = server.get('SSL证书', '')
            row['SSL密钥'] = server.get('SSL密钥', '')
            row['错误页面'] = server.get('错误页面', '')
            row['访问日志'] = server.get('访问日志', '')
            row['错误日志'] = server.get('错误日志', '')

            # Location级别字段
            row['路由-location'] = loc.get('路由-location', '')
            row['匹配模式'] = loc.get('匹配模式', '')
            row['目标/路径'] = loc.get('目标/路径', '')
            row['负载均衡'] = loc.get('负载均衡', '')
            row['负载均衡策略'] = loc.get('负载均衡策略', '')
            row['HTTP方法限制'] = loc.get('HTTP方法限制', '')
            row['超时设置'] = loc.get('超时设置', '')
            row['访问控制'] = loc.get('访问控制', '')
            row['重写规则'] = loc.get('重写规则', '')
            row['客户端最大Body大小'] = loc.get('客户端最大Body大小', '')
            row['其他参数'] = loc.get('其他参数', '')

            rows.append(row)

    return pd.DataFrame(rows)

def main():
    # 检查依赖
    try:
        import pandas as pd
    except ImportError:
        print("错误: 未找到pandas库,请先安装。可以使用命令:pip install pandas openpyxl")
        return

    # 检查参数
    if len(sys.argv) != 2:
        print("用法: python3 parse_nginx.py <nginx.conf>")
        print("示例: python3 parse_nginx.py /etc/nginx/nginx.conf")
        return

    file_path = sys.argv[1]
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            config_content = f.read()
    except FileNotFoundError:
        print(f"错误: 文件 {file_path} 未找到,请检查路径是否正确。")
        return
    except Exception as e:
        print(f"读取文件发生错误: {e}")
        return

    # 解析并生成表格
    print("正在解析Nginx配置文件...")
    servers = parse_nginx_config(config_content, file_path)
    table = generate_table(servers)

    # 打印预览
    print("\n解析完成!前5行数据预览:")
    print(table.head())
    print(f"\n总共解析出 {len(table)} 个location配置")

    # 保存为 Excel
    excel_file_path = 'nginx_config_table.xlsx'
    try:
        table.to_excel(excel_file_path, index=False, engine='openpyxl')
        print(f"\nNginx配置解析结果已保存至 {excel_file_path}")
        print(f"Excel文件包含以下字段:")
        for i, col in enumerate(table.columns, 1):
            print(f"{i}. {col}")
    except Exception as e:
        print(f"保存Excel文件时发生错误: {e}")
        print("请确保已安装openpyxl: pip install openpyxl")

if __name__ == '__main__':
    main()

功能说明与使用场景

  1. 结构化数据存储:使用 OrderedDict 保持字段顺序,使生成的Excel表格结构清晰、易于查阅与分析。
  2. SSL/TLS配置解析:新增SSL状态检测与证书路径提取功能,极大地便利了安全审计和证书生命周期管理。
  3. 负载均衡策略识别:不仅能提取 upstream 中的服务器地址列表,还能自动识别其使用的负载均衡算法(如轮询、ip_hash、least_conn等),这对于数据库/中间件的性能调优与架构理解至关重要。
  4. 访问控制规则提取:自动识别 location 块中的 allow/deny 规则,便于快速进行安全策略审查与合规性检查。
  5. 超时设置与性能参数:提取代理连接、发送、读取的超时设置以及客户端请求体大小限制,为运维/DevOps团队的性能瓶颈分析与优化提供数据支持。

使用方式

# 1. 安装必要的Python依赖库
pip install pandas openpyxl

# 2. 运行脚本,指定需要解析的Nginx配置文件路径
python3 parse_nginx.py /etc/nginx/nginx.conf

运行成功后,脚本会在当前目录下生成名为 nginx_config_table.xlsx 的Excel文件。该表格涵盖了从基础监听信息到详细的 location 路由规则在内的全面配置信息。

典型应用场景

  • 安全审计:集中审查SSL配置状态、证书路径以及IP访问控制规则。
  • 性能分析:统一查看各服务的超时设置、负载均衡策略与请求大小限制。
  • 运维管理:快速归档和对比不同环境的日志路径、错误页面配置。
  • 开发与协作:为开发人员提供清晰的路由规则和重写配置参考,便于前后端联调。



上一篇:AI编程的认知差异:为何工具更擅长后端,舆论却更担忧前端?
下一篇:2023年3月GESP C++二级编程认证真题完整解析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-17 18:47 , Processed in 0.112077 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表