找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

152

积分

0

好友

20

主题
发表于 前天 03:20 | 查看: 17| 回复: 0

一、概述

1.1 背景介绍

在运维工作中,大量重复性任务占据了工程师60%-80%的工作时间:日志分析、批量操作、监控告警、资源清理等。这些任务虽然简单,但手工执行效率低且易出错。Python凭借其简洁的语法、丰富的标准库和第三方模块,成为运维自动化的首选语言。本文将分享10个经过生产环境验证的Python脚本,帮助运维工程师从重复劳动中解放出来。

1.2 技术特点

  • 开发效率高:Python语法简洁,开发周期仅为Shell脚本的1/3,适合快速实现运维需求
  • 生态系统完善:提供paramiko、requests、psutil等成熟运维库,避免重复造轮子
  • 跨平台兼容:同一脚本可在Linux、Windows、macOS上运行,减少维护成本
  • 易于维护扩展:代码可读性强,便于团队协作和功能迭代

1.3 适用场景

  • 场景一:需要批量管理100台以上服务器的运维团队,如配置分发、命令执行、文件同步
  • 场景二:日均处理GB级日志的业务系统,需要自动化分析异常、统计访问量、生成报表
  • 场景三:多云环境资源管理,包括虚拟机、容器、存储的自动清理和成本优化
  • 场景四:7x24小时监控场景,需要自动化健康检查、告警处理、故障自愈

1.4 环境要求

组件 版本要求 说明
操作系统 CentOS 7+/Ubuntu 18.04+ 建议使用LTS版本
Python 3.8+ 推荐3.10+,支持最新语法特性
pip 20.0+ 用于安装依赖包
硬件配置 2C4G+ 根据实际负载调整

二、详细步骤

2.1 准备工作

◆ 2.1.1 系统检查
# 检查Python版本
python3 --version
# 检查pip版本
pip3 --version
# 检查系统资源
free -h
df -h
◆ 2.1.2 安装依赖
# 升级pip
pip3 install --upgrade pip
# 安装常用运维库
pip3 install paramiko requests psutil schedule pymysql redis elasticsearch prometheus-client
# 验证安装
pip3 list | grep -E "paramiko|requests|psutil"

2.2 核心配置

◆ 2.2.1 配置SSH密钥认证
# 生成SSH密钥对
ssh-keygen -t rsa -b 4096 -f ~/.ssh/ops_rsa -N ""
# 分发公钥到目标服务器(示例)
ssh-copy-id -i ~/.ssh/ops_rsa.pub root@192.168.1.100

说明:使用密钥认证替代密码登录,提高安全性并支持批量操作。建议为运维脚本单独创建密钥对,便于权限管理和审计。

◆ 2.2.2 配置文件示例
# 配置文件:config.yml
servers:
  - host: 192.168.1.100
    port: 22
    user: root
    key_file: ~/.ssh/ops_rsa
  - host: 192.168.1.101
    port: 22
    user: root
    key_file: ~/.ssh/ops_rsa

mysql:
  host: 192.168.1.200
  port: 3306
  user: monitor
  password: your_password
  database: ops

redis:
  host: 192.168.1.201
  port: 6379
  password: your_redis_password
  db: 0

log:
  level: INFO
  file: /var/log/ops/automation.log
  max_size: 100  # MB
  backup_count: 10

参数说明

  • servers:目标服务器列表,支持批量操作
  • mysql/redis:数据库连接信息,用于存储执行结果和状态
  • log:日志配置,建议使用轮转避免磁盘占满
◆ 2.2.3 日志配置
# logging_config.py
import logging
from logging.handlers import RotatingFileHandler

def setup_logger(log_file='/var/log/ops/automation.log', level=logging.INFO):
    logger = logging.getLogger('ops_automation')
    logger.setLevel(level)

    # 轮转文件处理器
    handler = RotatingFileHandler(
        log_file,
        maxBytes=100*1024*1024,  # 100MB
        backupCount=10
    )
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger

2.3 启动和验证

◆ 2.3.1 基础测试
# 测试SSH连接
python3 -c "import paramiko; print('paramiko OK')"
# 测试配置文件读取
python3 -c "import yaml; print(yaml.safe_load(open('config.yml')))"
◆ 2.3.2 功能验证
# 验证SSH批量执行(示例脚本1)
python3 batch_ssh_executor.py "uptime"
# 预期输出
# [192.168.1.100] SUCCESS: 10:30:23 up 45 days, 2:15, 1 user, load average: 0.15, 0.10, 0.08
# [192.168.1.101] SUCCESS: 10:30:24 up 30 days, 5:20, 1 user, load average: 0.25, 0.20, 0.18

三、示例代码和配置

3.1 完整配置示例

◆ 3.1.1 脚本1:批量SSH命令执行器
#!/usr/bin/env python3
# 文件路径:batch_ssh_executor.py
"""
批量SSH命令执行器
支持并发执行、结果收集、异常处理
"""
import paramiko
import yaml
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging_config import setup_logger

logger = setup_logger()

class SSHExecutor:
    def __init__(self, config_file='config.yml'):
        with open(config_file) as f:
            self.config = yaml.safe_load(f)
        self.servers = self.config['servers']

    def execute_on_host(self, server, command, timeout=30):
        """在单个主机上执行命令"""
        host = server['host']
        try:
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            # 使用密钥认证
            key = paramiko.RSAKey.from_private_key_file(server['key_file'])
            client.connect(
                hostname=host,
                port=server['port'],
                username=server['user'],
                pkey=key,
                timeout=10
            )

            stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
            exit_code = stdout.channel.recv_exit_status()
            result = {
                'host': host,
                'success': exit_code == 0,
                'stdout': stdout.read().decode('utf-8', errors='ignore').strip(),
                'stderr': stderr.read().decode('utf-8', errors='ignore').strip(),
                'exit_code': exit_code
            }
            client.close()
            logger.info(f"[{host}] Command executed, exit_code={exit_code}")
            return result

        except Exception as e:
            logger.error(f"[{host}] Error: {str(e)}")
            return {
                'host': host,
                'success': False,
                'stdout': '',
                'stderr': str(e),
                'exit_code': -1
            }

    def execute_parallel(self, command, max_workers=10):
        """并发执行命令"""
        results = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(self.execute_on_host, server, command): server
                for server in self.servers
            }
            for future in as_completed(futures):
                results.append(future.result())
        return results

    def print_results(self, results):
        """格式化输出结果"""
        success_count = sum(1 for r in results if r['success'])
        print(f"\n执行完成: 成功 {success_count}/{len(results)}\n")

        for result in sorted(results, key=lambda x: x['host']):
            status = "SUCCESS" if result['success'] else "FAILED"
            print(f"[{result['host']}] {status}")
            if result['stdout']:
                print(f"  输出: {result['stdout']}")
            if result['stderr']:
                print(f"  错误: {result['stderr']}")
            print()

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("用法: python3 batch_ssh_executor.py '<command>'")
        sys.exit(1)

    command = sys.argv[1]
    executor = SSHExecutor()
    results = executor.execute_parallel(command)
    executor.print_results(results)
◆ 3.1.2 脚本2:日志分析与告警
#!/usr/bin/env python3
# 文件名:log_analyzer.py
"""
日志分析工具
功能:错误统计、异常检测、自动告警
"""
import re
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import requests
from logging_config import setup_logger

logger = setup_logger()

class LogAnalyzer:
    def __init__(self, log_file):
        self.log_file = log_file
        self.error_patterns = {
            'http_5xx': r'HTTP/\d\.\d"\s5\d{2}',
            'exception': r'(Exception|Error|Fatal)',
            'timeout': r'(timeout|timed out)',
            'connection_refused': r'Connection refused',
            'out_of_memory': r'(OutOfMemory|OOM|Cannot allocate memory)'
        }

    def parse_nginx_log(self, line):
        """解析Nginx日志格式"""
        pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\d+) "(.*?)" "(.*?)"'
        match = re.match(pattern, line)
        if match:
            return {
                'ip': match.group(1),
                'time': match.group(2),
                'request': match.group(3),
                'status': int(match.group(4)),
                'size': int(match.group(5)),
                'referer': match.group(6),
                'user_agent': match.group(7)
            }
        return None

    def analyze(self, time_window=60):
        """分析最近N分钟的日志"""
        now = datetime.now()
        cutoff_time = now - timedelta(minutes=time_window)

        stats = {
            'total_requests': 0,
            'error_count': defaultdict(int),
            'status_codes': Counter(),
            'top_ips': Counter(),
            'slow_requests': []
        }

        with open(self.log_file, 'r') as f:
            for line in f:
                entry = self.parse_nginx_log(line)
                if not entry:
                    continue

                # 时间过滤
                log_time = datetime.strptime(entry['time'], '%d/%b/%Y:%H:%M:%S %z')
                if log_time.replace(tzinfo=None) < cutoff_time:
                    continue

                stats['total_requests'] += 1
                stats['status_codes'][entry['status']] += 1
                stats['top_ips'][entry['ip']] += 1

                # 错误检测
                for error_type, pattern in self.error_patterns.items():
                    if re.search(pattern, line):
                        stats['error_count'][error_type] += 1

                # 5xx错误记录
                if 500 <= entry['status'] < 600:
                    stats['slow_requests'].append({
                        'time': entry['time'],
                        'request': entry['request'],
                        'status': entry['status']
                    })

        return stats

    def check_alert_conditions(self, stats):
        """检查告警条件"""
        alerts = []

        # 5xx错误率超过5%
        if stats['total_requests'] > 0:
            error_5xx = sum(count for code, count in stats['status_codes'].items() 
                           if 500 <= code < 600)
            error_rate = error_5xx / stats['total_requests']
            if error_rate > 0.05:
                alerts.append({
                    'level': 'critical',
                    'message': f'5xx错误率: {error_rate*100:.2f}% ({error_5xx}/{stats["total_requests"]})'
                })

        # OOM错误
        if stats['error_count']['out_of_memory'] > 0:
            alerts.append({
                'level': 'critical',
                'message': f'检测到OOM错误: {stats["error_count"]["out_of_memory"]}次'
            })

        # 连接超时
        if stats['error_count']['timeout'] > 100:
            alerts.append({
                'level': 'warning',
                'message': f'超时错误异常: {stats["error_count"]["timeout"]}次'
            })

        return alerts

    def send_alert(self, alerts, webhook_url):
        """发送告警到企业微信/钉钉"""
        if not alerts:
            return

        message = "【日志告警】\n" + "\n".join(
            f"[{a['level'].upper()}] {a['message']}" for a in alerts
        )

        payload = {
            "msgtype": "text",
            "text": {"content": message}
        }

        try:
            response = requests.post(webhook_url, json=payload, timeout=5)
            if response.status_code == 200:
                logger.info("告警发送成功")
            else:
                logger.error(f"告警发送失败: {response.status_code}")
        except Exception as e:
            logger.error(f"告警发送异常: {str(e)}")

if __name__ == '__main__':
    analyzer = LogAnalyzer('/var/log/nginx/access.log')
    stats = analyzer.analyze(time_window=5)

    print(f"总请求数: {stats['total_requests']}")
    print(f"状态码分布: {dict(stats['status_codes'])}")
    print(f"Top 10 IP: {stats['top_ips'].most_common(10)}")
    print(f"错误统计: {dict(stats['error_count'])}")

    alerts = analyzer.check_alert_conditions(stats)
    if alerts:
        print("\n触发告警:")
        for alert in alerts:
            print(f"  [{alert['level']}] {alert['message']}")

        # 发送告警(替换为实际webhook地址)
        # analyzer.send_alert(alerts, 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx')
◆ 3.1.3 脚本3:系统资源监控
#!/usr/bin/env python3
# 文件名:system_monitor.py
"""
系统资源监控
监控CPU、内存、磁盘、网络,支持Prometheus集成
"""
import psutil
import time
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from logging_config import setup_logger

logger = setup_logger()

class SystemMonitor:
    def __init__(self, pushgateway_url='localhost:9091', job_name='system_monitor'):
        self.pushgateway_url = pushgateway_url
        self.job_name = job_name
        self.registry = CollectorRegistry()

        # 定义指标
        self.cpu_gauge = Gauge('system_cpu_percent', 'CPU使用率', registry=self.registry)
        self.memory_gauge = Gauge('system_memory_percent', '内存使用率', registry=self.registry)
        self.disk_gauge = Gauge('system_disk_percent', '磁盘使用率',
                               ['mountpoint'], registry=self.registry)
        self.network_gauge = Gauge('system_network_bytes', '网络流量',
                                  ['interface', 'direction'], registry=self.registry)

    def collect_metrics(self):
        """采集系统指标"""
        metrics = {}

        # CPU
        cpu_percent = psutil.cpu_percent(interval=1)
        metrics['cpu'] = cpu_percent
        self.cpu_gauge.set(cpu_percent)

        # 内存
        memory = psutil.virtual_memory()
        metrics['memory'] = {
            'percent': memory.percent,
            'total': memory.total,
            'available': memory.available,
            'used': memory.used
        }
        self.memory_gauge.set(memory.percent)

        # 磁盘
        metrics['disk'] = {}
        for partition in psutil.disk_partitions():
            try:
                usage = psutil.disk_usage(partition.mountpoint)
                metrics['disk'][partition.mountpoint] = {
                    'percent': usage.percent,
                    'total': usage.total,
                    'used': usage.used,
                    'free': usage.free
                }
                self.disk_gauge.labels(mountpoint=partition.mountpoint).set(usage.percent)
            except PermissionError:
                continue

        # 网络
        net_io = psutil.net_io_counters(pernic=True)
        metrics['network'] = {}
        for interface, stats in net_io.items():
            metrics['network'][interface] = {
                'bytes_sent': stats.bytes_sent,
                'bytes_recv': stats.bytes_recv
            }
            self.network_gauge.labels(interface=interface, direction='sent').set(stats.bytes_sent)
            self.network_gauge.labels(interface=interface, direction='recv').set(stats.bytes_recv)

        return metrics

    def check_thresholds(self, metrics):
        """检查阈值告警"""
        alerts = []

        if metrics['cpu'] > 80:
            alerts.append(f"CPU使用率过高: {metrics['cpu']:.1f}%")

        if metrics['memory']['percent'] > 85:
            alerts.append(f"内存使用率过高: {metrics['memory']['percent']:.1f}%")

        for mount, stats in metrics['disk'].items():
            if stats['percent'] > 90:
                alerts.append(f"磁盘空间不足: {mount} ({stats['percent']:.1f}%)")

        return alerts

    def push_metrics(self):
        """推送指标到Pushgateway"""
        try:
            push_to_gateway(self.pushgateway_url, job=self.job_name, registry=self.registry)
            logger.info("指标推送成功")
        except Exception as e:
            logger.error(f"指标推送失败: {str(e)}")

    def run(self, interval=60):
        """持续监控"""
        logger.info(f"开始监控,采集间隔: {interval}秒")

        while True:
            try:
                metrics = self.collect_metrics()
                alerts = self.check_thresholds(metrics)

                if alerts:
                    logger.warning("触发告警: " + "; ".join(alerts))

                self.push_metrics()
                time.sleep(interval)

            except KeyboardInterrupt:
                logger.info("监控停止")
                break
            except Exception as e:
                logger.error(f"监控异常: {str(e)}")
                time.sleep(interval)

if __name__ == '__main__':
    monitor = SystemMonitor()

    # 单次采集
    metrics = monitor.collect_metrics()
    print(f"CPU: {metrics['cpu']:.1f}%")
    print(f"内存: {metrics['memory']['percent']:.1f}%")
    print("磁盘:")
    for mount, stats in metrics['disk'].items():
        print(f"  {mount}: {stats['percent']:.1f}%")

    # 持续监控(取消注释启用)
    # monitor.run(interval=60)
◆ 3.1.4 脚本4:MySQL慢查询分析
#!/usr/bin/env python3
# 文件名:mysql_slow_query_analyzer.py
"""
MySQL慢查询分析
解析慢查询日志,生成优化建议
"""
import re
import pymysql
from collections import defaultdict
from logging_config import setup_logger

logger = setup_logger()

class SlowQueryAnalyzer:
    def __init__(self, slow_log_file, db_config):
        self.slow_log_file = slow_log_file
        self.db_config = db_config
        self.queries = []

    def parse_slow_log(self):
        """解析慢查询日志"""
        current_query = {}

        with open(self.slow_log_file, 'r') as f:
            for line in f:
                # Time行
                if line.startswith('# Time:'):
                    if current_query:
                        self.queries.append(current_query)
                    current_query = {'time': line.split(':', 1)[1].strip()}

                # User@Host行
                elif line.startswith('# User@Host:'):
                    match = re.search(r'(\w+)\[(\w+)\] @ (\S+)', line)
                    if match:
                        current_query['user'] = match.group(1)
                        current_query['host'] = match.group(3)

                # Query_time行
                elif line.startswith('# Query_time:'):
                    match = re.search(
                        r'Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+Rows_sent: (\d+)\s+Rows_examined: (\d+)',
                        line
                    )
                    if match:
                        current_query['query_time'] = float(match.group(1))
                        current_query['lock_time'] = float(match.group(2))
                        current_query['rows_sent'] = int(match.group(3))
                        current_query['rows_examined'] = int(match.group(4))

                # SQL语句
                elif not line.startswith('#') and line.strip():
                    current_query['sql'] = current_query.get('sql', '') + line.strip() + ' '

        if current_query:
            self.queries.append(current_query)

        logger.info(f"解析完成,共 {len(self.queries)} 条慢查询")

    def analyze(self):
        """分析慢查询"""
        stats = {
            'total': len(self.queries),
            'avg_query_time': 0,
            'max_query_time': 0,
            'top_queries': [],
            'table_scan': []
        }

        if not self.queries:
            return stats

        # 基础统计
        total_time = sum(q['query_time'] for q in self.queries)
        stats['avg_query_time'] = total_time / len(self.queries)
        stats['max_query_time'] = max(q['query_time'] for q in self.queries)

        # Top 10耗时查询
        sorted_queries = sorted(self.queries, key=lambda x: x['query_time'], reverse=True)
        stats['top_queries'] = sorted_queries[:10]

        # 全表扫描检测(rows_examined > 10000)
        stats['table_scan'] = [
            q for q in self.queries if q.get('rows_examined', 0) > 10000
        ]

        return stats

    def get_explain_plan(self, sql):
        """获取EXPLAIN执行计划"""
        try:
            conn = pymysql.connect(**self.db_config)
            cursor = conn.cursor()
            cursor.execute(f"EXPLAIN {sql}")
            result = cursor.fetchall()
            cursor.close()
            conn.close()
            return result
        except Exception as e:
            logger.error(f"EXPLAIN失败: {str(e)}")
            return None

    def generate_report(self, stats):
        """生成分析报告"""
        report = []
        report.append("=" * 80)
        report.append("MySQL慢查询分析报告")
        report.append("=" * 80)
        report.append(f"总慢查询数: {stats['total']}")
        report.append(f"平均查询时间: {stats['avg_query_time']:.2f}秒")
        report.append(f"最大查询时间: {stats['max_query_time']:.2f}秒")
        report.append("")

        report.append("Top 10耗时查询:")
        for i, query in enumerate(stats['top_queries'], 1):
            report.append(f"\n{i}. 查询时间: {query['query_time']:.2f}秒")
            report.append(f"   扫描行数: {query.get('rows_examined', 0)}")
            report.append(f"   SQL: {query.get('sql', '')[:200]}")

        if stats['table_scan']:
            report.append(f"\n发现 {len(stats['table_scan'])} 个全表扫描查询")
            for query in stats['table_scan'][:5]:
                report.append(f"  - {query.get('sql', '')[:100]}")

        return "\n".join(report)

if __name__ == '__main__':
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'your_password',
        'database': 'test'
    }

    analyzer = SlowQueryAnalyzer('/var/lib/mysql/slow.log', db_config)
    analyzer.parse_slow_log()
    stats = analyzer.analyze()
    print(analyzer.generate_report(stats))
◆ 3.1.5 脚本5:文件同步工具
#!/usr/bin/env python3
# 文件名:file_sync.py
"""
文件同步工具
支持增量同步、断点续传、校验
"""
import os
import hashlib
import paramiko
from pathlib import Path
from logging_config import setup_logger

logger = setup_logger()

class FileSync:
    def __init__(self, ssh_config):
        self.ssh_config = ssh_config
        self.client = None
        self.sftp = None

    def connect(self):
        """建立SSH连接"""
        try:
            self.client = paramiko.SSHClient()
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            key = paramiko.RSAKey.from_private_key_file(self.ssh_config['key_file'])
            self.client.connect(
                hostname=self.ssh_config['host'],
                port=self.ssh_config['port'],
                username=self.ssh_config['user'],
                pkey=key
            )
            self.sftp = self.client.open_sftp()
            logger.info(f"连接成功: {self.ssh_config['host']}")
        except Exception as e:
            logger.error(f"连接失败: {str(e)}")
            raise

    def disconnect(self):
        """关闭连接"""
        if self.sftp:
            self.sftp.close()
        if self.client:
            self.client.close()

    def calculate_md5(self, file_path):
        """计算文件MD5"""
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()

    def remote_file_exists(self, remote_path):
        """检查远程文件是否存在"""
        try:
            self.sftp.stat(remote_path)
            return True
        except FileNotFoundError:
            return False

    def sync_file(self, local_path, remote_path, check_md5=True):
        """同步单个文件"""
        try:
            # 确保远程目录存在
            remote_dir = os.path.dirname(remote_path)
            try:
                self.sftp.stat(remote_dir)
            except FileNotFoundError:
                self._create_remote_dir(remote_dir)

            # MD5校验
            need_upload = True
            if check_md5 and self.remote_file_exists(remote_path):
                local_md5 = self.calculate_md5(local_path)
                # 远程MD5计算(需要执行命令)
                stdin, stdout, stderr = self.client.exec_command(f"md5sum {remote_path}")
                remote_md5 = stdout.read().decode().split()[0]

                if local_md5 == remote_md5:
                    logger.info(f"文件未变化,跳过: {local_path}")
                    need_upload = False

            if need_upload:
                self.sftp.put(local_path, remote_path)
                logger.info(f"上传成功: {local_path} -> {remote_path}")
                return True

            return False

        except Exception as e:
            logger.error(f"同步失败 {local_path}: {str(e)}")
            return False

    def _create_remote_dir(self, remote_dir):
        """递归创建远程目录"""
        dirs = []
        while remote_dir != '/':
            dirs.append(remote_dir)
            remote_dir = os.path.dirname(remote_dir)

        for dir_path in reversed(dirs):
            try:
                self.sftp.stat(dir_path)
            except FileNotFoundError:
                self.sftp.mkdir(dir_path)
                logger.info(f"创建目录: {dir_path}")

    def sync_directory(self, local_dir, remote_dir, exclude_patterns=None):
        """同步整个目录"""
        exclude_patterns = exclude_patterns or []
        synced_count = 0
        skipped_count = 0

        for root, dirs, files in os.walk(local_dir):
            # 计算相对路径
            rel_path = os.path.relpath(root, local_dir)
            remote_root = os.path.join(remote_dir, rel_path).replace('\\', '/')

            for file in files:
                # 排除规则
                if any(pattern in file for pattern in exclude_patterns):
                    continue

                local_file = os.path.join(root, file)
                remote_file = os.path.join(remote_root, file).replace('\\', '/')

                if self.sync_file(local_file, remote_file):
                    synced_count += 1
                else:
                    skipped_count += 1

        logger.info(f"同步完成: 上传{synced_count}个文件,跳过{skipped_count}个")

if __name__ == '__main__':
    ssh_config = {
        'host': '192.168.1.100',
        'port': 22,
        'user': 'root',
        'key_file': '~/.ssh/ops_rsa'
    }

    sync = FileSync(ssh_config)
    sync.connect()

    # 同步单个文件
    # sync.sync_file('/local/config.yml', '/remote/config.yml')

    # 同步目录
    sync.sync_directory('/local/app', '/remote/app',
        exclude_patterns=['.git', '.pyc', '__pycache__']
    )

    sync.disconnect()

3.2 实际应用案例

◆ 案例一:自动化证书续期检查

场景描述:管理100+个域名的SSL证书,需要提前30天发现即将过期的证书并告警。

实现代码

#!/usr/bin/env python3
# 文件名:ssl_cert_checker.py
import ssl
import socket
from datetime import datetime, timedelta
import requests

class SSLCertChecker:
    def __init__(self, domains, alert_days=30):
        self.domains = domains
        self.alert_days = alert_days

    def check_cert_expiry(self, domain, port=443):
        """检查证书过期时间"""
        try:
            context = ssl.create_default_context()
            with socket.create_connection((domain, port), timeout=10) as sock:
                with context.wrap_socket(sock, server_hostname=domain) as ssock:
                    cert = ssock.getpeercert()

            # 解析过期时间
            expire_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
            days_left = (expire_date - datetime.now()).days

            return {
                'domain': domain,
                'expire_date': expire_date,
                'days_left': days_left,
                'issuer': dict(x[0] for x in cert['issuer'])
            }
        except Exception as e:
            return {
                'domain': domain,
                'error': str(e)
            }

    def check_all(self):
        """检查所有域名"""
        results = []
        alerts = []

        for domain in self.domains:
            result = self.check_cert_expiry(domain)
            results.append(result)

            if 'days_left' in result and result['days_left'] < self.alert_days:
                alerts.append(f"{domain} 证书将在 {result['days_left']} 天后过期")

        return results, alerts

# 使用示例
domains = ['example.com', 'api.example.com', 'www.example.com']
checker = SSLCertChecker(domains)
results, alerts = checker.check_all()

for result in results:
    if 'days_left' in result:
        print(f"{result['domain']}: 剩余 {result['days_left']} 天")
    else:
        print(f"{result['domain']}: 检查失败 - {result['error']}")

if alerts:
    print("\n告警:")
    for alert in alerts:
        print(f"  - {alert}")

运行结果

example.com: 剩余 85 天
api.example.com: 剩余 12 天
www.example.com: 剩余 45 天

告警:
  - api.example.com 证书将在 12 天后过期
◆ 案例二:Docker容器资源清理

场景描述:定期清理停止超过7天的容器、未使用的镜像和volume,释放磁盘空间。

实现代码

#!/usr/bin/env python3
# 文件名:docker_cleanup.py
import subprocess
import json
from datetime import datetime, timedelta

class DockerCleaner:
    def __init__(self, dry_run=True):
        self.dry_run = dry_run

    def get_stopped_containers(self, days=7):
        """获取停止超过N天的容器"""
        cutoff_time = datetime.now() - timedelta(days=days)
        cmd = "docker ps -a --format '{{json .}}'"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        stopped_containers = []
        for line in result.stdout.strip().split('\n'):
            if not line:
                continue

            container = json.loads(line)
            if container['State'] != 'exited':
                continue

            # 获取容器详细信息
            inspect_cmd = f"docker inspect {container['ID']}"
            inspect_result = subprocess.run(inspect_cmd, shell=True, capture_output=True, text=True)
            detail = json.loads(inspect_result.stdout)[0]

            finished_at = datetime.fromisoformat(detail['State']['FinishedAt'].split('.')[0])
            if finished_at < cutoff_time:
                stopped_containers.append({
                    'id': container['ID'],
                    'name': container['Names'],
                    'finished_at': finished_at
                })

        return stopped_containers

    def remove_containers(self, containers):
        """删除容器"""
        for container in containers:
            cmd = f"docker rm {container['id']}"
            if self.dry_run:
                print(f"[DRY RUN] {cmd}")
            else:
                subprocess.run(cmd, shell=True)
                print(f"已删除容器: {container['name']}")

    def prune_images(self):
        """清理未使用的镜像"""
        cmd = "docker image prune -a -f"
        if self.dry_run:
            print(f"[DRY RUN] {cmd}")
        else:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            print(result.stdout)

    def prune_volumes(self):
        """清理未使用的volume"""
        cmd = "docker volume prune -f"
        if self.dry_run:
            print(f"[DRY RUN] {cmd}")
        else:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            print(result.stdout)

    def cleanup(self, container_days=7):
        """执行清理"""
        print(f"开始清理(DRY RUN: {self.dry_run})")

        # 清理容器
        containers = self.get_stopped_containers(container_days)
        print(f"\n发现 {len(containers)} 个停止超过{container_days}天的容器")
        self.remove_containers(containers)

        # 清理镜像
        print("\n清理未使用的镜像...")
        self.prune_images()

        # 清理volume
        print("\n清理未使用的volume...")
        self.prune_volumes()

# 使用示例
cleaner = DockerCleaner(dry_run=False)
cleaner.cleanup(container_days=7)
◆ 案例三:定时数据库备份

场景描述:每天凌晨2点自动备份MySQL数据库,保留最近30天的备份,自动清理过期文件。

实现步骤

  1. 创建备份脚本
#!/usr/bin/env python3
# 文件名:mysql_backup.py
import os
import subprocess
from datetime import datetime, timedelta
import gzip
import shutil

class MySQLBackup:
    def __init__(self, config):
        self.host = config['host']
        self.user = config['user']
        self.password = config['password']
        self.databases = config['databases']
        self.backup_dir = config['backup_dir']
        self.retention_days = config.get('retention_days', 30)

    def backup_database(self, database):
        """备份单个数据库"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = f"{self.backup_dir}/{database}_{timestamp}.sql"

        # mysqldump命令
        cmd = [
            'mysqldump',
            f'--host={self.host}',
            f'--user={self.user}',
            f'--password={self.password}',
            '--single-transaction',
            '--routines',
            '--triggers',
            '--events',
            database
        ]

        try:
            with open(backup_file, 'w') as f:
                subprocess.run(cmd, stdout=f, check=True)

            # 压缩
            with open(backup_file, 'rb') as f_in:
                with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

            os.remove(backup_file)
            print(f"备份成功: {database} -> {backup_file}.gz")
            return f"{backup_file}.gz"

        except subprocess.CalledProcessError as e:
            print(f"备份失败: {database} - {str(e)}")
            return None

    def cleanup_old_backups(self):
        """清理过期备份"""
        cutoff_time = datetime.now() - timedelta(days=self.retention_days)

        for filename in os.listdir(self.backup_dir):
            if not filename.endswith('.sql.gz'):
                continue

            file_path = os.path.join(self.backup_dir, filename)
            file_time = datetime.fromtimestamp(os.path.getmtime(file_path))

            if file_time < cutoff_time:
                os.remove(file_path)
                print(f"删除过期备份: {filename}")

    def run(self):
        """执行备份"""
        print(f"开始备份,时间: {datetime.now()}")

        for database in self.databases:
            self.backup_database(database)

        self.cleanup_old_backups()
        print("备份完成")

# 配置
config = {
    'host': 'localhost',
    'user': 'backup',
    'password': 'your_password',
    'databases': ['app_db', 'user_db'],
    'backup_dir': '/data/mysql_backups',
    'retention_days': 30
}

backup = MySQLBackup(config)
backup.run()
  1. 配置crontab定时任务
# 编辑crontab
crontab -e

# 添加定时任务(每天凌晨2点执行)
0 2 * * * /usr/bin/python3 /opt/scripts/mysql_backup.py >> /var/log/mysql_backup.log 2>&1
  1. 验证备份
# 查看备份文件
ls -lh /data/mysql_backups/

# 测试恢复
gunzip < app_db_20250115_020001.sql.gz | mysql -u root -p app_db_test

四、最佳实践和注意事项

4.1 最佳实践

◆ 4.1.1 性能优化
  • 并发控制:使用ThreadPoolExecutor时合理设置max_workers,避免过多并发导致系统负载过高

    # 根据CPU核心数动态调整
    import os
    max_workers = min(32, (os.cpu_count() or 1) * 4)
  • 连接池复用:对于MySQL、Redis等数据库/中间件,使用连接池减少建立连接的开销

    
    from dbutils.pooled_db import PooledDB
    import pymysql

pool = PooledDB( creator=pymysql, maxconnections=10, host='localhost', user='root', password='password' )


- 批量操作:批量执行数据库写入或API调用,减少网络往返次数

批量插入

cursor.executemany("INSERT INTO logs (message, level) VALUES (%s, %s)", [(msg, level) for msg, level in log_entries] )


#### ◆ 4.1.2 安全加固

- 敏感信息管理:使用环境变量或密钥管理系统(如HashiCorp Vault)存储密码

import os from dotenv import load_dotenv

load_dotenv() DB_PASSWORD = os.getenv('DB_PASSWORD')


- SSH密钥权限:确保私钥文件权限为600,防止被其他用户读取

chmod 600 ~/.ssh/ops_rsa


- 输入验证:对用户输入进行严格校验,防止命令注入

import shlex

安全的命令参数处理

safe_command = shlex.quote(user_input)


#### ◆ 4.1.3 高可用配置

- 异常重试机制:网络操作添加重试逻辑,提高脚本鲁棒性

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def api_call(): response = requests.get('https://api.example.com') response.raise_for_status() return response.json()


- 健康检查:长时间运行的脚本应定期检查依赖服务的健康状态
- 备份策略:关键操作前先备份,如修改配置文件、删除数据等

import shutil shutil.copy2('/etc/nginx/nginx.conf', '/etc/nginx/nginx.conf.backup')


### 4.2 注意事项

#### ◆ 4.2.1 配置注意事项

- 生产环境执行脚本前务必在测试环境充分验证,特别是涉及删除、修改操作的脚本
- SSH操作时避免使用root用户,应创建专用运维账号并限制sudo权限
- 日志文件需配置轮转,防止占满磁盘空间
- 定时任务的脚本应使用绝对路径,避免因环境变量差异导致执行失败

#### ◆ 4.2.2 常见错误

| 错误现象 | 原因分析 | 解决方案 |
|----------|----------|----------|
| paramiko认证失败 | SSH密钥权限错误或路径不对 | 检查私钥文件权限为600,路径使用绝对路径 |
| UnicodeDecodeError | 日志文件包含非UTF-8字符 | 读取时使用errors='ignore'参数 |
| 数据库连接超时 | 防火墙阻止或连接数达到上限 | 检查防火墙规则,增加max_connections |
| crontab定时任务未执行 | 环境变量缺失 | 脚本中显式设置PATH或使用绝对路径 |
| 磁盘空间不足导致脚本异常终止 | 未监控磁盘使用率 | 添加磁盘空间检查,低于阈值发送告警 |

#### ◆ 4.2.3 兼容性问题

- 版本兼容:Python 3.6+引入了f-string,3.8+支持海象运算符,编写脚本时需考虑目标环境的Python版本
- 平台兼容:路径处理使用os.path或pathlib,避免硬编码Windows或Linux路径分隔符
- 组件依赖:paramiko 2.7+要求cryptography 3.0+,升级时需注意依赖版本兼容性

## 五、故障排查和监控

### 5.1 故障排查

#### ◆ 5.1.1 日志查看

查看脚本执行日志

tail -f /var/log/ops/automation.log

查看crontab执行记录

grep CRON /var/log/syslog | tail -20

查看Python异常堆栈

grep -A 20 "Traceback" /var/log/ops/automation.log


#### ◆ 5.1.2 常见问题排查

**问题一:SSH连接超时**

测试SSH连接

ssh -vvv -i ~/.ssh/ops_rsa root@192.168.1.100

检查防火墙

sudo iptables -L -n | grep 22


**解决方案**:
1. 检查目标服务器SSH服务是否运行:systemctl status sshd
2. 验证防火墙规则是否允许22端口:firewall-cmd --list-all
3. 确认网络连通性:ping 192.168.1.100
4. 检查/etc/hosts.allow和/etc/hosts.deny配置

**问题二:内存占用持续增长**

监控Python进程内存

ps aux | grep python | sort -k4 -nr | head -5

使用memory_profiler分析

pip3 install memory-profiler python3 -m memory_profiler script.py


**解决方案**:
1. 检查是否存在循环引用导致对象无法释放
2. 大文件读取改用生成器或分块处理
3. 及时关闭数据库连接和文件句柄
4. 使用del显式删除大对象

**问题三:脚本执行缓慢**

- 症状:批量操作耗时远超预期,CPU使用率低
- 排查:使用cProfile分析性能瓶颈

import cProfile cProfile.run('main()', sort='cumulative')

- 解决:优化数据库查询(添加索引)、使用并发加速I/O密集型任务、减少不必要的日志输出

#### ◆ 5.1.3 调试模式

开启详细日志

import logging logging.basicConfig(level=logging.DEBUG)

paramiko开启调试

paramiko.util.log_to_file('/tmp/paramiko.log', level=logging.DEBUG)

requests显示HTTP请求详情

import http.client http.client.HTTPConnection.debuglevel = 1


### 5.2 性能监控

#### ◆ 5.2.1 关键指标监控

脚本执行时间监控

import time from functools import wraps

def timing_decorator(func): @wraps(func) def wrapper(*args, *kwargs): start = time.time() result = func(args, **kwargs) duration = time.time() - start logger.info(f"{func.name} 执行耗时: {duration:.2f}秒") return result return wrapper

@timing_decorator def batch_operation():

批量操作逻辑

pass

监控脚本资源占用

top -p $(pgrep -f "python.*automation")

监控网络流量

iftop -i eth0

监控磁盘IO

iostat -x 1


#### ◆ 5.2.2 监控指标说明

| 指标名称 | 正常范围 | 告警阈值 | 说明 |
|----------|----------|----------|------|
| 脚本执行时间 | <5分钟 | >10分钟 | 超时可能是网络问题或死锁 |
| 内存使用率 | <70% | >85% | 持续增长可能存在内存泄露 |
| 错误率 | <1% | >5% | SSH失败、API调用失败等 |
| 并发数 | 10-50 | >100 | 过高并发可能导致目标服务器过载 |
| 日志大小 | <100MB/天 | >1GB/天 | 异常日志输出需检查代码逻辑 |

#### ◆ 5.2.3 监控告警配置

Prometheus告警规则示例

groups:

  • name: python_automation_alerts interval: 30s rules:

    • alert: ScriptExecutionTimeout expr: script_duration_seconds > 600 for: 5m labels: severity: warning annotations: summary: "脚本执行超时: {{ $labels.script_name }}" description: "执行时间 {{$value }}秒"

    • alert: HighErrorRate expr: rate(script_errors_total[5m]) > 0.05 for: 5m labels: severity: critical annotations: summary: "脚本错误率过高" description: "错误率 {{ $value | humanizePercentage }}"

5.3 备份与恢复

◆ 5.3.1 备份策略
#!/bin/bash
# 脚本文件备份脚本
BACKUP_DIR="/data/backups/scripts"
SOURCE_DIR="/opt/ops_scripts"
DATE=$(date +%Y%m%d)

# 创建备份目录
mkdir -p $BACKUP_DIR

# 打包备份
tar -czf $BACKUP_DIR/scripts_$DATE.tar.gz \
    -C $SOURCE_DIR \
    --exclude='*.pyc' \
    --exclude='__pycache__' \
    --exclude='.git' \
    .

# 保留最近30天备份
find $BACKUP_DIR -name "scripts_*.tar.gz" -mtime +30 -delete

echo "备份完成: $BACKUP_DIR/scripts_$DATE.tar.gz"
◆ 5.3.2 恢复流程
  1. 停止相关服务:

    # 停止crontab任务
    crontab -r
  2. 恢复数据:

    # 解压备份文件
    tar -xzf /data/backups/scripts/scripts_20250115.tar.gz -C /opt/ops_scripts_restore
  3. 验证完整性:

    
    # 检查文件数量
    diff -r /opt/ops_scripts /opt/ops_scripts_restore

测试脚本语法

python3 -m py_compile /opt/ops_scripts_restore/*.py


4. 重启服务:

恢复crontab

crontab /data/backups/crontab_backup.txt


## 六、总结

### 6.1 技术要点回顾

- Python运维自动化的核心价值在于减少重复劳动、提高执行效率、降低人为错误率
- 选择合适的库是成功的关键:paramiko处理SSH、psutil监控系统、schedule实现定时任务
- 生产环境脚本必须具备完善的日志、异常处理、重试机制,确保故障可追溯、可恢复
- 安全性不容忽视:密钥管理、权限控制、输入校验缺一不可

### 6.2 进阶学习方向

1. 异步编程优化:学习asyncio和aiohttp,将I/O密集型脚本改造为异步版本,大幅提升性能
   - 学习资源:Python官方asyncio文档
   - 实践建议:改造批量HTTP请求脚本,对比同步和异步版本的性能差异

2. 容器化部署:将运维脚本打包为Docker镜像,实现跨环境一致性执行
   - 学习资源:Docker最佳实践
   - 实践建议:使用Alpine Linux作为基础镜像,减小镜像体积

3. 可观测性增强:集成OpenTelemetry实现分布式追踪,结合Grafana可视化脚本执行情况
   - 学习资源:OpenTelemetry Python SDK
   - 实践建议:为关键脚本添加Trace和Metrics,建立运维自动化的观测体系

### 6.3 参考资料

- Python官方文档 - 标准库和语言特性权威参考
- Paramiko官方文档 - SSH自动化必备库
- psutil文档 - 系统监控和进程管理
- Real Python运维教程 - 实战导向的Python运维教程
- Awesome Python - 精选Python运维相关库和工具

## 附录

### A. 命令速查表

Python环境管理

python3 -m venv /opt/venv # 创建虚拟环境 source /opt/venv/bin/activate # 激活虚拟环境 pip3 freeze > requirements.txt # 导出依赖 pip3 install -r requirements.txt # 安装依赖

常用运维命令

python3 batch_ssh_executor.py "df -h" # 批量检查磁盘 python3 log_analyzer.py # 分析日志 python3 system_monitor.py # 系统监控 python3 mysql_backup.py # 数据库备份

调试和性能分析

python3 -m pdb script.py # 调试脚本 python3 -m cProfile -o profile.stats script.py # 性能分析 python3 -m trace --count script.py # 代码覆盖率

代码质量检查

pylint script.py # 代码规范检查 black script.py # 代码格式化 mypy script.py # 类型检查



### B. 配置参数详解

**paramiko.SSHClient参数**:
- timeout:连接超时时间(秒),默认无超时,建议设置10-30秒
- banner_timeout:SSH banner读取超时,默认15秒
- auth_timeout:认证超时,默认15秒
- allow_agent:是否使用SSH agent,默认True
- look_for_keys:是否搜索~/.ssh/目录下的密钥,默认True

**logging.RotatingFileHandler参数**:
- maxBytes:单个日志文件最大字节数,建议100MB
- backupCount:保留的日志文件数量,建议10-30个
- encoding:日志文件编码,建议utf-8
- delay:延迟创建文件直到第一次写入,默认False

**ThreadPoolExecutor参数**:
- max_workers:最大线程数,建议为CPU核心数的2-4倍
- thread_name_prefix:线程名称前缀,便于调试
- initializer:线程启动时执行的初始化函数

### C. 术语表

| 术语 | 英文 | 解释 |
|------|------|------|
| 幂等性 | Idempotence | 多次执行产生相同结果的特性,运维脚本必备 |
| 连接池 | Connection Pool | 预先建立并复用连接,减少建立连接的开销 |
| 异步编程 | Asynchronous Programming | 使用协程提高I/O密集型任务的并发性能 |
| 分布式追踪 | Distributed Tracing | 跟踪请求在分布式系统中的完整调用链路 |
| 熔断器 | Circuit Breaker | 当服务异常时自动断开调用,防止雪崩 |
| SSH密钥认证 | SSH Key Authentication | 使用公私钥对进行身份验证,比密码更安全 |
| 慢查询 | Slow Query | 执行时间超过阈值的数据库查询 |
| 全表扫描 | Full Table Scan | 数据库查询未使用索引,扫描整张表的所有行 |
| 日志轮转 | Log Rotation | 定期归档和删除旧日志,防止占满磁盘 |
| 优雅退出 | Graceful Shutdown | 程序收到终止信号后完成当前任务再退出 |
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区(YunPan.Plus) ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-1 14:57 , Processed in 0.121022 second(s), 36 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 CloudStack.

快速回复 返回顶部 返回列表