一、概述
1.1 背景介绍
在运维工作中,大量重复性任务占据了工程师60%-80%的工作时间:日志分析、批量操作、监控告警、资源清理等。这些任务虽然简单,但手工执行效率低且易出错。Python凭借其简洁的语法、丰富的标准库和第三方模块,成为运维自动化的首选语言。本文将分享10个经过生产环境验证的Python脚本,帮助运维工程师从重复劳动中解放出来。
1.2 技术特点
- 开发效率高:Python语法简洁,开发周期仅为Shell脚本的1/3,适合快速实现运维需求
- 生态系统完善:提供paramiko、requests、psutil等成熟运维库,避免重复造轮子
- 跨平台兼容:同一脚本可在Linux、Windows、macOS上运行,减少维护成本
- 易于维护扩展:代码可读性强,便于团队协作和功能迭代
1.3 适用场景
- 场景一:需要批量管理100台以上服务器的运维团队,如配置分发、命令执行、文件同步
- 场景二:日均处理GB级日志的业务系统,需要自动化分析异常、统计访问量、生成报表
- 场景三:多云环境资源管理,包括虚拟机、容器、存储的自动清理和成本优化
- 场景四:7x24小时监控场景,需要自动化健康检查、告警处理、故障自愈
1.4 环境要求
| 组件 |
版本要求 |
说明 |
| 操作系统 |
CentOS 7+/Ubuntu 18.04+ |
建议使用LTS版本 |
| Python |
3.8+ |
推荐3.10+,支持最新语法特性 |
| pip |
20.0+ |
用于安装依赖包 |
| 硬件配置 |
2C4G+ |
根据实际负载调整 |
二、详细步骤
2.1 准备工作
◆ 2.1.1 系统检查
# 检查Python版本
python3 --version
# 检查pip版本
pip3 --version
# 检查系统资源
free -h
df -h
◆ 2.1.2 安装依赖
# 升级pip
pip3 install --upgrade pip
# 安装常用运维库
pip3 install paramiko requests psutil schedule pymysql redis elasticsearch prometheus-client
# 验证安装
pip3 list | grep -E "paramiko|requests|psutil"
2.2 核心配置
◆ 2.2.1 配置SSH密钥认证
# 生成SSH密钥对
ssh-keygen -t rsa -b 4096 -f ~/.ssh/ops_rsa -N ""
# 分发公钥到目标服务器(示例)
ssh-copy-id -i ~/.ssh/ops_rsa.pub root@192.168.1.100
说明:使用密钥认证替代密码登录,提高安全性并支持批量操作。建议为运维脚本单独创建密钥对,便于权限管理和审计。
◆ 2.2.2 配置文件示例
# 配置文件:config.yml
servers:
- host: 192.168.1.100
port: 22
user: root
key_file: ~/.ssh/ops_rsa
- host: 192.168.1.101
port: 22
user: root
key_file: ~/.ssh/ops_rsa
mysql:
host: 192.168.1.200
port: 3306
user: monitor
password: your_password
database: ops
redis:
host: 192.168.1.201
port: 6379
password: your_redis_password
db: 0
log:
level: INFO
file: /var/log/ops/automation.log
max_size: 100 # MB
backup_count: 10
参数说明:
- servers:目标服务器列表,支持批量操作
- mysql/redis:数据库连接信息,用于存储执行结果和状态
- log:日志配置,建议使用轮转避免磁盘占满
◆ 2.2.3 日志配置
# logging_config.py
import logging
from logging.handlers import RotatingFileHandler
def setup_logger(log_file='/var/log/ops/automation.log', level=logging.INFO):
logger = logging.getLogger('ops_automation')
logger.setLevel(level)
# 轮转文件处理器
handler = RotatingFileHandler(
log_file,
maxBytes=100*1024*1024, # 100MB
backupCount=10
)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
2.3 启动和验证
◆ 2.3.1 基础测试
# 测试SSH连接
python3 -c "import paramiko; print('paramiko OK')"
# 测试配置文件读取
python3 -c "import yaml; print(yaml.safe_load(open('config.yml')))"
◆ 2.3.2 功能验证
# 验证SSH批量执行(示例脚本1)
python3 batch_ssh_executor.py "uptime"
# 预期输出
# [192.168.1.100] SUCCESS: 10:30:23 up 45 days, 2:15, 1 user, load average: 0.15, 0.10, 0.08
# [192.168.1.101] SUCCESS: 10:30:24 up 30 days, 5:20, 1 user, load average: 0.25, 0.20, 0.18
三、示例代码和配置
3.1 完整配置示例
◆ 3.1.1 脚本1:批量SSH命令执行器
#!/usr/bin/env python3
# 文件路径:batch_ssh_executor.py
"""
批量SSH命令执行器
支持并发执行、结果收集、异常处理
"""
import paramiko
import yaml
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging_config import setup_logger
logger = setup_logger()
class SSHExecutor:
def __init__(self, config_file='config.yml'):
with open(config_file) as f:
self.config = yaml.safe_load(f)
self.servers = self.config['servers']
def execute_on_host(self, server, command, timeout=30):
"""在单个主机上执行命令"""
host = server['host']
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 使用密钥认证
key = paramiko.RSAKey.from_private_key_file(server['key_file'])
client.connect(
hostname=host,
port=server['port'],
username=server['user'],
pkey=key,
timeout=10
)
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
result = {
'host': host,
'success': exit_code == 0,
'stdout': stdout.read().decode('utf-8', errors='ignore').strip(),
'stderr': stderr.read().decode('utf-8', errors='ignore').strip(),
'exit_code': exit_code
}
client.close()
logger.info(f"[{host}] Command executed, exit_code={exit_code}")
return result
except Exception as e:
logger.error(f"[{host}] Error: {str(e)}")
return {
'host': host,
'success': False,
'stdout': '',
'stderr': str(e),
'exit_code': -1
}
def execute_parallel(self, command, max_workers=10):
"""并发执行命令"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.execute_on_host, server, command): server
for server in self.servers
}
for future in as_completed(futures):
results.append(future.result())
return results
def print_results(self, results):
"""格式化输出结果"""
success_count = sum(1 for r in results if r['success'])
print(f"\n执行完成: 成功 {success_count}/{len(results)}\n")
for result in sorted(results, key=lambda x: x['host']):
status = "SUCCESS" if result['success'] else "FAILED"
print(f"[{result['host']}] {status}")
if result['stdout']:
print(f" 输出: {result['stdout']}")
if result['stderr']:
print(f" 错误: {result['stderr']}")
print()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("用法: python3 batch_ssh_executor.py '<command>'")
sys.exit(1)
command = sys.argv[1]
executor = SSHExecutor()
results = executor.execute_parallel(command)
executor.print_results(results)
◆ 3.1.2 脚本2:日志分析与告警
#!/usr/bin/env python3
# 文件名:log_analyzer.py
"""
日志分析工具
功能:错误统计、异常检测、自动告警
"""
import re
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import requests
from logging_config import setup_logger
logger = setup_logger()
class LogAnalyzer:
def __init__(self, log_file):
self.log_file = log_file
self.error_patterns = {
'http_5xx': r'HTTP/\d\.\d"\s5\d{2}',
'exception': r'(Exception|Error|Fatal)',
'timeout': r'(timeout|timed out)',
'connection_refused': r'Connection refused',
'out_of_memory': r'(OutOfMemory|OOM|Cannot allocate memory)'
}
def parse_nginx_log(self, line):
"""解析Nginx日志格式"""
pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\d+) "(.*?)" "(.*?)"'
match = re.match(pattern, line)
if match:
return {
'ip': match.group(1),
'time': match.group(2),
'request': match.group(3),
'status': int(match.group(4)),
'size': int(match.group(5)),
'referer': match.group(6),
'user_agent': match.group(7)
}
return None
def analyze(self, time_window=60):
"""分析最近N分钟的日志"""
now = datetime.now()
cutoff_time = now - timedelta(minutes=time_window)
stats = {
'total_requests': 0,
'error_count': defaultdict(int),
'status_codes': Counter(),
'top_ips': Counter(),
'slow_requests': []
}
with open(self.log_file, 'r') as f:
for line in f:
entry = self.parse_nginx_log(line)
if not entry:
continue
# 时间过滤
log_time = datetime.strptime(entry['time'], '%d/%b/%Y:%H:%M:%S %z')
if log_time.replace(tzinfo=None) < cutoff_time:
continue
stats['total_requests'] += 1
stats['status_codes'][entry['status']] += 1
stats['top_ips'][entry['ip']] += 1
# 错误检测
for error_type, pattern in self.error_patterns.items():
if re.search(pattern, line):
stats['error_count'][error_type] += 1
# 5xx错误记录
if 500 <= entry['status'] < 600:
stats['slow_requests'].append({
'time': entry['time'],
'request': entry['request'],
'status': entry['status']
})
return stats
def check_alert_conditions(self, stats):
"""检查告警条件"""
alerts = []
# 5xx错误率超过5%
if stats['total_requests'] > 0:
error_5xx = sum(count for code, count in stats['status_codes'].items()
if 500 <= code < 600)
error_rate = error_5xx / stats['total_requests']
if error_rate > 0.05:
alerts.append({
'level': 'critical',
'message': f'5xx错误率: {error_rate*100:.2f}% ({error_5xx}/{stats["total_requests"]})'
})
# OOM错误
if stats['error_count']['out_of_memory'] > 0:
alerts.append({
'level': 'critical',
'message': f'检测到OOM错误: {stats["error_count"]["out_of_memory"]}次'
})
# 连接超时
if stats['error_count']['timeout'] > 100:
alerts.append({
'level': 'warning',
'message': f'超时错误异常: {stats["error_count"]["timeout"]}次'
})
return alerts
def send_alert(self, alerts, webhook_url):
"""发送告警到企业微信/钉钉"""
if not alerts:
return
message = "【日志告警】\n" + "\n".join(
f"[{a['level'].upper()}] {a['message']}" for a in alerts
)
payload = {
"msgtype": "text",
"text": {"content": message}
}
try:
response = requests.post(webhook_url, json=payload, timeout=5)
if response.status_code == 200:
logger.info("告警发送成功")
else:
logger.error(f"告警发送失败: {response.status_code}")
except Exception as e:
logger.error(f"告警发送异常: {str(e)}")
if __name__ == '__main__':
analyzer = LogAnalyzer('/var/log/nginx/access.log')
stats = analyzer.analyze(time_window=5)
print(f"总请求数: {stats['total_requests']}")
print(f"状态码分布: {dict(stats['status_codes'])}")
print(f"Top 10 IP: {stats['top_ips'].most_common(10)}")
print(f"错误统计: {dict(stats['error_count'])}")
alerts = analyzer.check_alert_conditions(stats)
if alerts:
print("\n触发告警:")
for alert in alerts:
print(f" [{alert['level']}] {alert['message']}")
# 发送告警(替换为实际webhook地址)
# analyzer.send_alert(alerts, 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx')
◆ 3.1.3 脚本3:系统资源监控
#!/usr/bin/env python3
# 文件名:system_monitor.py
"""
系统资源监控
监控CPU、内存、磁盘、网络,支持Prometheus集成
"""
import psutil
import time
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from logging_config import setup_logger
logger = setup_logger()
class SystemMonitor:
def __init__(self, pushgateway_url='localhost:9091', job_name='system_monitor'):
self.pushgateway_url = pushgateway_url
self.job_name = job_name
self.registry = CollectorRegistry()
# 定义指标
self.cpu_gauge = Gauge('system_cpu_percent', 'CPU使用率', registry=self.registry)
self.memory_gauge = Gauge('system_memory_percent', '内存使用率', registry=self.registry)
self.disk_gauge = Gauge('system_disk_percent', '磁盘使用率',
['mountpoint'], registry=self.registry)
self.network_gauge = Gauge('system_network_bytes', '网络流量',
['interface', 'direction'], registry=self.registry)
def collect_metrics(self):
"""采集系统指标"""
metrics = {}
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
metrics['cpu'] = cpu_percent
self.cpu_gauge.set(cpu_percent)
# 内存
memory = psutil.virtual_memory()
metrics['memory'] = {
'percent': memory.percent,
'total': memory.total,
'available': memory.available,
'used': memory.used
}
self.memory_gauge.set(memory.percent)
# 磁盘
metrics['disk'] = {}
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
metrics['disk'][partition.mountpoint] = {
'percent': usage.percent,
'total': usage.total,
'used': usage.used,
'free': usage.free
}
self.disk_gauge.labels(mountpoint=partition.mountpoint).set(usage.percent)
except PermissionError:
continue
# 网络
net_io = psutil.net_io_counters(pernic=True)
metrics['network'] = {}
for interface, stats in net_io.items():
metrics['network'][interface] = {
'bytes_sent': stats.bytes_sent,
'bytes_recv': stats.bytes_recv
}
self.network_gauge.labels(interface=interface, direction='sent').set(stats.bytes_sent)
self.network_gauge.labels(interface=interface, direction='recv').set(stats.bytes_recv)
return metrics
def check_thresholds(self, metrics):
"""检查阈值告警"""
alerts = []
if metrics['cpu'] > 80:
alerts.append(f"CPU使用率过高: {metrics['cpu']:.1f}%")
if metrics['memory']['percent'] > 85:
alerts.append(f"内存使用率过高: {metrics['memory']['percent']:.1f}%")
for mount, stats in metrics['disk'].items():
if stats['percent'] > 90:
alerts.append(f"磁盘空间不足: {mount} ({stats['percent']:.1f}%)")
return alerts
def push_metrics(self):
"""推送指标到Pushgateway"""
try:
push_to_gateway(self.pushgateway_url, job=self.job_name, registry=self.registry)
logger.info("指标推送成功")
except Exception as e:
logger.error(f"指标推送失败: {str(e)}")
def run(self, interval=60):
"""持续监控"""
logger.info(f"开始监控,采集间隔: {interval}秒")
while True:
try:
metrics = self.collect_metrics()
alerts = self.check_thresholds(metrics)
if alerts:
logger.warning("触发告警: " + "; ".join(alerts))
self.push_metrics()
time.sleep(interval)
except KeyboardInterrupt:
logger.info("监控停止")
break
except Exception as e:
logger.error(f"监控异常: {str(e)}")
time.sleep(interval)
if __name__ == '__main__':
monitor = SystemMonitor()
# 单次采集
metrics = monitor.collect_metrics()
print(f"CPU: {metrics['cpu']:.1f}%")
print(f"内存: {metrics['memory']['percent']:.1f}%")
print("磁盘:")
for mount, stats in metrics['disk'].items():
print(f" {mount}: {stats['percent']:.1f}%")
# 持续监控(取消注释启用)
# monitor.run(interval=60)
◆ 3.1.4 脚本4:MySQL慢查询分析
#!/usr/bin/env python3
# 文件名:mysql_slow_query_analyzer.py
"""
MySQL慢查询分析
解析慢查询日志,生成优化建议
"""
import re
import pymysql
from collections import defaultdict
from logging_config import setup_logger
logger = setup_logger()
class SlowQueryAnalyzer:
def __init__(self, slow_log_file, db_config):
self.slow_log_file = slow_log_file
self.db_config = db_config
self.queries = []
def parse_slow_log(self):
"""解析慢查询日志"""
current_query = {}
with open(self.slow_log_file, 'r') as f:
for line in f:
# Time行
if line.startswith('# Time:'):
if current_query:
self.queries.append(current_query)
current_query = {'time': line.split(':', 1)[1].strip()}
# User@Host行
elif line.startswith('# User@Host:'):
match = re.search(r'(\w+)\[(\w+)\] @ (\S+)', line)
if match:
current_query['user'] = match.group(1)
current_query['host'] = match.group(3)
# Query_time行
elif line.startswith('# Query_time:'):
match = re.search(
r'Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+Rows_sent: (\d+)\s+Rows_examined: (\d+)',
line
)
if match:
current_query['query_time'] = float(match.group(1))
current_query['lock_time'] = float(match.group(2))
current_query['rows_sent'] = int(match.group(3))
current_query['rows_examined'] = int(match.group(4))
# SQL语句
elif not line.startswith('#') and line.strip():
current_query['sql'] = current_query.get('sql', '') + line.strip() + ' '
if current_query:
self.queries.append(current_query)
logger.info(f"解析完成,共 {len(self.queries)} 条慢查询")
def analyze(self):
"""分析慢查询"""
stats = {
'total': len(self.queries),
'avg_query_time': 0,
'max_query_time': 0,
'top_queries': [],
'table_scan': []
}
if not self.queries:
return stats
# 基础统计
total_time = sum(q['query_time'] for q in self.queries)
stats['avg_query_time'] = total_time / len(self.queries)
stats['max_query_time'] = max(q['query_time'] for q in self.queries)
# Top 10耗时查询
sorted_queries = sorted(self.queries, key=lambda x: x['query_time'], reverse=True)
stats['top_queries'] = sorted_queries[:10]
# 全表扫描检测(rows_examined > 10000)
stats['table_scan'] = [
q for q in self.queries if q.get('rows_examined', 0) > 10000
]
return stats
def get_explain_plan(self, sql):
"""获取EXPLAIN执行计划"""
try:
conn = pymysql.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute(f"EXPLAIN {sql}")
result = cursor.fetchall()
cursor.close()
conn.close()
return result
except Exception as e:
logger.error(f"EXPLAIN失败: {str(e)}")
return None
def generate_report(self, stats):
"""生成分析报告"""
report = []
report.append("=" * 80)
report.append("MySQL慢查询分析报告")
report.append("=" * 80)
report.append(f"总慢查询数: {stats['total']}")
report.append(f"平均查询时间: {stats['avg_query_time']:.2f}秒")
report.append(f"最大查询时间: {stats['max_query_time']:.2f}秒")
report.append("")
report.append("Top 10耗时查询:")
for i, query in enumerate(stats['top_queries'], 1):
report.append(f"\n{i}. 查询时间: {query['query_time']:.2f}秒")
report.append(f" 扫描行数: {query.get('rows_examined', 0)}")
report.append(f" SQL: {query.get('sql', '')[:200]}")
if stats['table_scan']:
report.append(f"\n发现 {len(stats['table_scan'])} 个全表扫描查询")
for query in stats['table_scan'][:5]:
report.append(f" - {query.get('sql', '')[:100]}")
return "\n".join(report)
if __name__ == '__main__':
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'test'
}
analyzer = SlowQueryAnalyzer('/var/lib/mysql/slow.log', db_config)
analyzer.parse_slow_log()
stats = analyzer.analyze()
print(analyzer.generate_report(stats))
◆ 3.1.5 脚本5:文件同步工具
#!/usr/bin/env python3
# 文件名:file_sync.py
"""
文件同步工具
支持增量同步、断点续传、校验
"""
import os
import hashlib
import paramiko
from pathlib import Path
from logging_config import setup_logger
logger = setup_logger()
class FileSync:
def __init__(self, ssh_config):
self.ssh_config = ssh_config
self.client = None
self.sftp = None
def connect(self):
"""建立SSH连接"""
try:
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key = paramiko.RSAKey.from_private_key_file(self.ssh_config['key_file'])
self.client.connect(
hostname=self.ssh_config['host'],
port=self.ssh_config['port'],
username=self.ssh_config['user'],
pkey=key
)
self.sftp = self.client.open_sftp()
logger.info(f"连接成功: {self.ssh_config['host']}")
except Exception as e:
logger.error(f"连接失败: {str(e)}")
raise
def disconnect(self):
"""关闭连接"""
if self.sftp:
self.sftp.close()
if self.client:
self.client.close()
def calculate_md5(self, file_path):
"""计算文件MD5"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def remote_file_exists(self, remote_path):
"""检查远程文件是否存在"""
try:
self.sftp.stat(remote_path)
return True
except FileNotFoundError:
return False
def sync_file(self, local_path, remote_path, check_md5=True):
"""同步单个文件"""
try:
# 确保远程目录存在
remote_dir = os.path.dirname(remote_path)
try:
self.sftp.stat(remote_dir)
except FileNotFoundError:
self._create_remote_dir(remote_dir)
# MD5校验
need_upload = True
if check_md5 and self.remote_file_exists(remote_path):
local_md5 = self.calculate_md5(local_path)
# 远程MD5计算(需要执行命令)
stdin, stdout, stderr = self.client.exec_command(f"md5sum {remote_path}")
remote_md5 = stdout.read().decode().split()[0]
if local_md5 == remote_md5:
logger.info(f"文件未变化,跳过: {local_path}")
need_upload = False
if need_upload:
self.sftp.put(local_path, remote_path)
logger.info(f"上传成功: {local_path} -> {remote_path}")
return True
return False
except Exception as e:
logger.error(f"同步失败 {local_path}: {str(e)}")
return False
def _create_remote_dir(self, remote_dir):
"""递归创建远程目录"""
dirs = []
while remote_dir != '/':
dirs.append(remote_dir)
remote_dir = os.path.dirname(remote_dir)
for dir_path in reversed(dirs):
try:
self.sftp.stat(dir_path)
except FileNotFoundError:
self.sftp.mkdir(dir_path)
logger.info(f"创建目录: {dir_path}")
def sync_directory(self, local_dir, remote_dir, exclude_patterns=None):
"""同步整个目录"""
exclude_patterns = exclude_patterns or []
synced_count = 0
skipped_count = 0
for root, dirs, files in os.walk(local_dir):
# 计算相对路径
rel_path = os.path.relpath(root, local_dir)
remote_root = os.path.join(remote_dir, rel_path).replace('\\', '/')
for file in files:
# 排除规则
if any(pattern in file for pattern in exclude_patterns):
continue
local_file = os.path.join(root, file)
remote_file = os.path.join(remote_root, file).replace('\\', '/')
if self.sync_file(local_file, remote_file):
synced_count += 1
else:
skipped_count += 1
logger.info(f"同步完成: 上传{synced_count}个文件,跳过{skipped_count}个")
if __name__ == '__main__':
ssh_config = {
'host': '192.168.1.100',
'port': 22,
'user': 'root',
'key_file': '~/.ssh/ops_rsa'
}
sync = FileSync(ssh_config)
sync.connect()
# 同步单个文件
# sync.sync_file('/local/config.yml', '/remote/config.yml')
# 同步目录
sync.sync_directory('/local/app', '/remote/app',
exclude_patterns=['.git', '.pyc', '__pycache__']
)
sync.disconnect()
3.2 实际应用案例
◆ 案例一:自动化证书续期检查
场景描述:管理100+个域名的SSL证书,需要提前30天发现即将过期的证书并告警。
实现代码:
#!/usr/bin/env python3
# 文件名:ssl_cert_checker.py
import ssl
import socket
from datetime import datetime, timedelta
import requests
class SSLCertChecker:
def __init__(self, domains, alert_days=30):
self.domains = domains
self.alert_days = alert_days
def check_cert_expiry(self, domain, port=443):
"""检查证书过期时间"""
try:
context = ssl.create_default_context()
with socket.create_connection((domain, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
# 解析过期时间
expire_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_left = (expire_date - datetime.now()).days
return {
'domain': domain,
'expire_date': expire_date,
'days_left': days_left,
'issuer': dict(x[0] for x in cert['issuer'])
}
except Exception as e:
return {
'domain': domain,
'error': str(e)
}
def check_all(self):
"""检查所有域名"""
results = []
alerts = []
for domain in self.domains:
result = self.check_cert_expiry(domain)
results.append(result)
if 'days_left' in result and result['days_left'] < self.alert_days:
alerts.append(f"{domain} 证书将在 {result['days_left']} 天后过期")
return results, alerts
# 使用示例
domains = ['example.com', 'api.example.com', 'www.example.com']
checker = SSLCertChecker(domains)
results, alerts = checker.check_all()
for result in results:
if 'days_left' in result:
print(f"{result['domain']}: 剩余 {result['days_left']} 天")
else:
print(f"{result['domain']}: 检查失败 - {result['error']}")
if alerts:
print("\n告警:")
for alert in alerts:
print(f" - {alert}")
运行结果:
example.com: 剩余 85 天
api.example.com: 剩余 12 天
www.example.com: 剩余 45 天
告警:
- api.example.com 证书将在 12 天后过期
◆ 案例二:Docker容器资源清理
场景描述:定期清理停止超过7天的容器、未使用的镜像和volume,释放磁盘空间。
实现代码:
#!/usr/bin/env python3
# 文件名:docker_cleanup.py
import subprocess
import json
from datetime import datetime, timedelta
class DockerCleaner:
def __init__(self, dry_run=True):
self.dry_run = dry_run
def get_stopped_containers(self, days=7):
"""获取停止超过N天的容器"""
cutoff_time = datetime.now() - timedelta(days=days)
cmd = "docker ps -a --format '{{json .}}'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
stopped_containers = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
container = json.loads(line)
if container['State'] != 'exited':
continue
# 获取容器详细信息
inspect_cmd = f"docker inspect {container['ID']}"
inspect_result = subprocess.run(inspect_cmd, shell=True, capture_output=True, text=True)
detail = json.loads(inspect_result.stdout)[0]
finished_at = datetime.fromisoformat(detail['State']['FinishedAt'].split('.')[0])
if finished_at < cutoff_time:
stopped_containers.append({
'id': container['ID'],
'name': container['Names'],
'finished_at': finished_at
})
return stopped_containers
def remove_containers(self, containers):
"""删除容器"""
for container in containers:
cmd = f"docker rm {container['id']}"
if self.dry_run:
print(f"[DRY RUN] {cmd}")
else:
subprocess.run(cmd, shell=True)
print(f"已删除容器: {container['name']}")
def prune_images(self):
"""清理未使用的镜像"""
cmd = "docker image prune -a -f"
if self.dry_run:
print(f"[DRY RUN] {cmd}")
else:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
def prune_volumes(self):
"""清理未使用的volume"""
cmd = "docker volume prune -f"
if self.dry_run:
print(f"[DRY RUN] {cmd}")
else:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
def cleanup(self, container_days=7):
"""执行清理"""
print(f"开始清理(DRY RUN: {self.dry_run})")
# 清理容器
containers = self.get_stopped_containers(container_days)
print(f"\n发现 {len(containers)} 个停止超过{container_days}天的容器")
self.remove_containers(containers)
# 清理镜像
print("\n清理未使用的镜像...")
self.prune_images()
# 清理volume
print("\n清理未使用的volume...")
self.prune_volumes()
# 使用示例
cleaner = DockerCleaner(dry_run=False)
cleaner.cleanup(container_days=7)
◆ 案例三:定时数据库备份
场景描述:每天凌晨2点自动备份MySQL数据库,保留最近30天的备份,自动清理过期文件。
实现步骤:
- 创建备份脚本
#!/usr/bin/env python3
# 文件名:mysql_backup.py
import os
import subprocess
from datetime import datetime, timedelta
import gzip
import shutil
class MySQLBackup:
def __init__(self, config):
self.host = config['host']
self.user = config['user']
self.password = config['password']
self.databases = config['databases']
self.backup_dir = config['backup_dir']
self.retention_days = config.get('retention_days', 30)
def backup_database(self, database):
"""备份单个数据库"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f"{self.backup_dir}/{database}_{timestamp}.sql"
# mysqldump命令
cmd = [
'mysqldump',
f'--host={self.host}',
f'--user={self.user}',
f'--password={self.password}',
'--single-transaction',
'--routines',
'--triggers',
'--events',
database
]
try:
with open(backup_file, 'w') as f:
subprocess.run(cmd, stdout=f, check=True)
# 压缩
with open(backup_file, 'rb') as f_in:
with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(backup_file)
print(f"备份成功: {database} -> {backup_file}.gz")
return f"{backup_file}.gz"
except subprocess.CalledProcessError as e:
print(f"备份失败: {database} - {str(e)}")
return None
def cleanup_old_backups(self):
"""清理过期备份"""
cutoff_time = datetime.now() - timedelta(days=self.retention_days)
for filename in os.listdir(self.backup_dir):
if not filename.endswith('.sql.gz'):
continue
file_path = os.path.join(self.backup_dir, filename)
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_time:
os.remove(file_path)
print(f"删除过期备份: {filename}")
def run(self):
"""执行备份"""
print(f"开始备份,时间: {datetime.now()}")
for database in self.databases:
self.backup_database(database)
self.cleanup_old_backups()
print("备份完成")
# 配置
config = {
'host': 'localhost',
'user': 'backup',
'password': 'your_password',
'databases': ['app_db', 'user_db'],
'backup_dir': '/data/mysql_backups',
'retention_days': 30
}
backup = MySQLBackup(config)
backup.run()
- 配置crontab定时任务
# 编辑crontab
crontab -e
# 添加定时任务(每天凌晨2点执行)
0 2 * * * /usr/bin/python3 /opt/scripts/mysql_backup.py >> /var/log/mysql_backup.log 2>&1
- 验证备份
# 查看备份文件
ls -lh /data/mysql_backups/
# 测试恢复
gunzip < app_db_20250115_020001.sql.gz | mysql -u root -p app_db_test
四、最佳实践和注意事项
4.1 最佳实践
◆ 4.1.1 性能优化
-
并发控制:使用ThreadPoolExecutor时合理设置max_workers,避免过多并发导致系统负载过高
# 根据CPU核心数动态调整
import os
max_workers = min(32, (os.cpu_count() or 1) * 4)
-
连接池复用:对于MySQL、Redis等数据库/中间件,使用连接池减少建立连接的开销
from dbutils.pooled_db import PooledDB
import pymysql
pool = PooledDB(
creator=pymysql,
maxconnections=10,
host='localhost',
user='root',
password='password'
)
- 批量操作:批量执行数据库写入或API调用,减少网络往返次数
批量插入
cursor.executemany("INSERT INTO logs (message, level) VALUES (%s, %s)",
[(msg, level) for msg, level in log_entries]
)
#### ◆ 4.1.2 安全加固
- 敏感信息管理:使用环境变量或密钥管理系统(如HashiCorp Vault)存储密码
import os
from dotenv import load_dotenv
load_dotenv()
DB_PASSWORD = os.getenv('DB_PASSWORD')
- SSH密钥权限:确保私钥文件权限为600,防止被其他用户读取
chmod 600 ~/.ssh/ops_rsa
- 输入验证:对用户输入进行严格校验,防止命令注入
import shlex
安全的命令参数处理
safe_command = shlex.quote(user_input)
#### ◆ 4.1.3 高可用配置
- 异常重试机制:网络操作添加重试逻辑,提高脚本鲁棒性
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def api_call():
response = requests.get('https://api.example.com')
response.raise_for_status()
return response.json()
- 健康检查:长时间运行的脚本应定期检查依赖服务的健康状态
- 备份策略:关键操作前先备份,如修改配置文件、删除数据等
import shutil
shutil.copy2('/etc/nginx/nginx.conf', '/etc/nginx/nginx.conf.backup')
### 4.2 注意事项
#### ◆ 4.2.1 配置注意事项
- 生产环境执行脚本前务必在测试环境充分验证,特别是涉及删除、修改操作的脚本
- SSH操作时避免使用root用户,应创建专用运维账号并限制sudo权限
- 日志文件需配置轮转,防止占满磁盘空间
- 定时任务的脚本应使用绝对路径,避免因环境变量差异导致执行失败
#### ◆ 4.2.2 常见错误
| 错误现象 | 原因分析 | 解决方案 |
|----------|----------|----------|
| paramiko认证失败 | SSH密钥权限错误或路径不对 | 检查私钥文件权限为600,路径使用绝对路径 |
| UnicodeDecodeError | 日志文件包含非UTF-8字符 | 读取时使用errors='ignore'参数 |
| 数据库连接超时 | 防火墙阻止或连接数达到上限 | 检查防火墙规则,增加max_connections |
| crontab定时任务未执行 | 环境变量缺失 | 脚本中显式设置PATH或使用绝对路径 |
| 磁盘空间不足导致脚本异常终止 | 未监控磁盘使用率 | 添加磁盘空间检查,低于阈值发送告警 |
#### ◆ 4.2.3 兼容性问题
- 版本兼容:Python 3.6+引入了f-string,3.8+支持海象运算符,编写脚本时需考虑目标环境的Python版本
- 平台兼容:路径处理使用os.path或pathlib,避免硬编码Windows或Linux路径分隔符
- 组件依赖:paramiko 2.7+要求cryptography 3.0+,升级时需注意依赖版本兼容性
## 五、故障排查和监控
### 5.1 故障排查
#### ◆ 5.1.1 日志查看
查看脚本执行日志
tail -f /var/log/ops/automation.log
查看crontab执行记录
grep CRON /var/log/syslog | tail -20
查看Python异常堆栈
grep -A 20 "Traceback" /var/log/ops/automation.log
#### ◆ 5.1.2 常见问题排查
**问题一:SSH连接超时**
测试SSH连接
ssh -vvv -i ~/.ssh/ops_rsa root@192.168.1.100
检查防火墙
sudo iptables -L -n | grep 22
**解决方案**:
1. 检查目标服务器SSH服务是否运行:systemctl status sshd
2. 验证防火墙规则是否允许22端口:firewall-cmd --list-all
3. 确认网络连通性:ping 192.168.1.100
4. 检查/etc/hosts.allow和/etc/hosts.deny配置
**问题二:内存占用持续增长**
监控Python进程内存
ps aux | grep python | sort -k4 -nr | head -5
使用memory_profiler分析
pip3 install memory-profiler
python3 -m memory_profiler script.py
**解决方案**:
1. 检查是否存在循环引用导致对象无法释放
2. 大文件读取改用生成器或分块处理
3. 及时关闭数据库连接和文件句柄
4. 使用del显式删除大对象
**问题三:脚本执行缓慢**
- 症状:批量操作耗时远超预期,CPU使用率低
- 排查:使用cProfile分析性能瓶颈
import cProfile
cProfile.run('main()', sort='cumulative')
- 解决:优化数据库查询(添加索引)、使用并发加速I/O密集型任务、减少不必要的日志输出
#### ◆ 5.1.3 调试模式
开启详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
paramiko开启调试
paramiko.util.log_to_file('/tmp/paramiko.log', level=logging.DEBUG)
requests显示HTTP请求详情
import http.client
http.client.HTTPConnection.debuglevel = 1
### 5.2 性能监控
#### ◆ 5.2.1 关键指标监控
脚本执行时间监控
import time
from functools import wraps
def timing_decorator(func):
@wraps(func)
def wrapper(*args, *kwargs):
start = time.time()
result = func(args, **kwargs)
duration = time.time() - start
logger.info(f"{func.name} 执行耗时: {duration:.2f}秒")
return result
return wrapper
@timing_decorator
def batch_operation():
批量操作逻辑
pass
监控脚本资源占用
top -p $(pgrep -f "python.*automation")
监控网络流量
iftop -i eth0
监控磁盘IO
iostat -x 1
#### ◆ 5.2.2 监控指标说明
| 指标名称 | 正常范围 | 告警阈值 | 说明 |
|----------|----------|----------|------|
| 脚本执行时间 | <5分钟 | >10分钟 | 超时可能是网络问题或死锁 |
| 内存使用率 | <70% | >85% | 持续增长可能存在内存泄露 |
| 错误率 | <1% | >5% | SSH失败、API调用失败等 |
| 并发数 | 10-50 | >100 | 过高并发可能导致目标服务器过载 |
| 日志大小 | <100MB/天 | >1GB/天 | 异常日志输出需检查代码逻辑 |
#### ◆ 5.2.3 监控告警配置
Prometheus告警规则示例
groups:
5.3 备份与恢复
◆ 5.3.1 备份策略
#!/bin/bash
# 脚本文件备份脚本
BACKUP_DIR="/data/backups/scripts"
SOURCE_DIR="/opt/ops_scripts"
DATE=$(date +%Y%m%d)
# 创建备份目录
mkdir -p $BACKUP_DIR
# 打包备份
tar -czf $BACKUP_DIR/scripts_$DATE.tar.gz \
-C $SOURCE_DIR \
--exclude='*.pyc' \
--exclude='__pycache__' \
--exclude='.git' \
.
# 保留最近30天备份
find $BACKUP_DIR -name "scripts_*.tar.gz" -mtime +30 -delete
echo "备份完成: $BACKUP_DIR/scripts_$DATE.tar.gz"
◆ 5.3.2 恢复流程
-
停止相关服务:
# 停止crontab任务
crontab -r
-
恢复数据:
# 解压备份文件
tar -xzf /data/backups/scripts/scripts_20250115.tar.gz -C /opt/ops_scripts_restore
-
验证完整性:
# 检查文件数量
diff -r /opt/ops_scripts /opt/ops_scripts_restore
测试脚本语法
python3 -m py_compile /opt/ops_scripts_restore/*.py
4. 重启服务:
恢复crontab
crontab /data/backups/crontab_backup.txt
## 六、总结
### 6.1 技术要点回顾
- Python运维自动化的核心价值在于减少重复劳动、提高执行效率、降低人为错误率
- 选择合适的库是成功的关键:paramiko处理SSH、psutil监控系统、schedule实现定时任务
- 生产环境脚本必须具备完善的日志、异常处理、重试机制,确保故障可追溯、可恢复
- 安全性不容忽视:密钥管理、权限控制、输入校验缺一不可
### 6.2 进阶学习方向
1. 异步编程优化:学习asyncio和aiohttp,将I/O密集型脚本改造为异步版本,大幅提升性能
- 学习资源:Python官方asyncio文档
- 实践建议:改造批量HTTP请求脚本,对比同步和异步版本的性能差异
2. 容器化部署:将运维脚本打包为Docker镜像,实现跨环境一致性执行
- 学习资源:Docker最佳实践
- 实践建议:使用Alpine Linux作为基础镜像,减小镜像体积
3. 可观测性增强:集成OpenTelemetry实现分布式追踪,结合Grafana可视化脚本执行情况
- 学习资源:OpenTelemetry Python SDK
- 实践建议:为关键脚本添加Trace和Metrics,建立运维自动化的观测体系
### 6.3 参考资料
- Python官方文档 - 标准库和语言特性权威参考
- Paramiko官方文档 - SSH自动化必备库
- psutil文档 - 系统监控和进程管理
- Real Python运维教程 - 实战导向的Python运维教程
- Awesome Python - 精选Python运维相关库和工具
## 附录
### A. 命令速查表
Python环境管理
python3 -m venv /opt/venv # 创建虚拟环境
source /opt/venv/bin/activate # 激活虚拟环境
pip3 freeze > requirements.txt # 导出依赖
pip3 install -r requirements.txt # 安装依赖
常用运维命令
python3 batch_ssh_executor.py "df -h" # 批量检查磁盘
python3 log_analyzer.py # 分析日志
python3 system_monitor.py # 系统监控
python3 mysql_backup.py # 数据库备份
调试和性能分析
python3 -m pdb script.py # 调试脚本
python3 -m cProfile -o profile.stats script.py # 性能分析
python3 -m trace --count script.py # 代码覆盖率
代码质量检查
pylint script.py # 代码规范检查
black script.py # 代码格式化
mypy script.py # 类型检查
### B. 配置参数详解
**paramiko.SSHClient参数**:
- timeout:连接超时时间(秒),默认无超时,建议设置10-30秒
- banner_timeout:SSH banner读取超时,默认15秒
- auth_timeout:认证超时,默认15秒
- allow_agent:是否使用SSH agent,默认True
- look_for_keys:是否搜索~/.ssh/目录下的密钥,默认True
**logging.RotatingFileHandler参数**:
- maxBytes:单个日志文件最大字节数,建议100MB
- backupCount:保留的日志文件数量,建议10-30个
- encoding:日志文件编码,建议utf-8
- delay:延迟创建文件直到第一次写入,默认False
**ThreadPoolExecutor参数**:
- max_workers:最大线程数,建议为CPU核心数的2-4倍
- thread_name_prefix:线程名称前缀,便于调试
- initializer:线程启动时执行的初始化函数
### C. 术语表
| 术语 | 英文 | 解释 |
|------|------|------|
| 幂等性 | Idempotence | 多次执行产生相同结果的特性,运维脚本必备 |
| 连接池 | Connection Pool | 预先建立并复用连接,减少建立连接的开销 |
| 异步编程 | Asynchronous Programming | 使用协程提高I/O密集型任务的并发性能 |
| 分布式追踪 | Distributed Tracing | 跟踪请求在分布式系统中的完整调用链路 |
| 熔断器 | Circuit Breaker | 当服务异常时自动断开调用,防止雪崩 |
| SSH密钥认证 | SSH Key Authentication | 使用公私钥对进行身份验证,比密码更安全 |
| 慢查询 | Slow Query | 执行时间超过阈值的数据库查询 |
| 全表扫描 | Full Table Scan | 数据库查询未使用索引,扫描整张表的所有行 |
| 日志轮转 | Log Rotation | 定期归档和删除旧日志,防止占满磁盘 |
| 优雅退出 | Graceful Shutdown | 程序收到终止信号后完成当前任务再退出 |