在软件部署流程中,手动操作不仅效率低下,且极易因疏忽导致线上故障。一种高效的解决思路是:通过自动化监控部署目录,实现服务包的自动发现与部署。本文将介绍如何利用Python的watchdog库构建一个轻量级的自动化部署监听器。
核心需求与设计思路
设想一个典型的CI/CD场景:持续集成工具(如Jenkins)将构建好的应用包(如app-1.0.0.tar.gz)推送到服务器的特定目录(如/data/deploy/packages)。我们的目标是编写一个常驻脚本,自动检测该目录下的新包,并完成以下流程:
- 等待文件传输完成:确保包文件完整。
- 备份当前版本:便于故障时快速回滚。
- 执行部署动作:停服务、解压新包、切换版本、启动服务。
- 健康检查与回滚:验证服务状态,失败则自动回退。
使用watchdog监控文件夹变化
相较于传统的轮询(os.listdir()),利用系统文件事件监听更为高效。Python的watchdog库对此进行了良好封装。
首先安装依赖:
pip install watchdog
以下是一个最小化的监听示例,用于感知新包的到来:
import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCH_DIR = "/data/deploy/packages"
PACKAGE_SUFFIX = (".tar.gz", ".zip")
class PackageHandler(FileSystemEventHandler):
def on_created(self, event):
# event.src_path 可能是文件或目录
path = Path(event.src_path)
if path.is_file() and path.suffix in PACKAGE_SUFFIX:
print(f"[Watcher] 发现新包: {path.name}")
# 此处可接入后续部署逻辑
# deploy_service(path)
def start_watch():
event_handler = PackageHandler()
observer = Observer()
observer.schedule(event_handler, WATCH_DIR, recursive=False)
observer.start()
print(f"[Watcher] 开始监控目录: {WATCH_DIR}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
start_watch()
运行此脚本后,向监控目录放入一个.tar.gz或.zip文件,控制台将输出提示信息。
关键细节:确保文件传输完成
文件通过网络传输(如SCP、Rsync)时,脚本可能在其未完成时就触发on_created事件。直接处理“半成品”文件会导致部署失败。一个可靠的策略是等待文件大小稳定一段时间。
import os
import time
from pathlib import Path
def wait_file_stable(path: Path, check_interval=1, stable_seconds=5, timeout=300):
"""等待文件大小在指定秒数内不再变化"""
start = time.time()
last_size = -1
last_change_time = time.time()
while True:
if not path.exists():
raise FileNotFoundError(f"{path} 已被删除")
current_size = os.path.getsize(path)
if current_size != last_size:
last_size = current_size
last_change_time = time.time()
if time.time() - last_change_time >= stable_seconds:
return # 文件已稳定
if time.time() - start > timeout:
raise TimeoutError(f"等待文件稳定超时: {path}")
time.sleep(check_interval)
在事件处理器中调用此函数:
class PackageHandler(FileSystemEventHandler):
def on_created(self, event):
path = Path(event.src_path)
if path.is_file() and path.suffix in PACKAGE_SUFFIX:
print(f"[Watcher] 发现新包 {path.name},等待传输完成...")
try:
wait_file_stable(path)
except Exception as e:
print(f"[Watcher] 等待文件稳定失败: {e}")
return
print(f"[Watcher] 包已就绪,开始部署: {path.name}")
deploy_service(path) # 触发部署流程
部署流程编排与实现
一个清晰、可回滚的部署目录结构至关重要。推荐如下结构:
/app/my_service/
├── current -> /app/my_service/releases/20240101_120000 # 软链接指向当前版本
├── releases/ # 所有历史版本
│ ├── 20231201_100000/
│ ├── 20231215_093000/
│ └── 20240101_120000/
└── logs/
每次部署的核心步骤为:
- 将新包解压至
releases/下的新时间戳目录。
- 将
current软链接指向新目录。
- 重启服务。
以下是核心部署函数的实现示例:
import shutil
import subprocess
import tarfile
import time
from pathlib import Path
APP_ROOT = Path("/app/my_service")
RELEASES_DIR = APP_ROOT / "releases"
CURRENT_LINK = APP_ROOT / "current"
SERVICE_NAME = "my_service" # systemd服务名
def run_cmd(cmd: list[str]):
"""执行Shell命令"""
print(f"[Deploy] 执行: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDOUT: {result.stdout}")
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"命令执行失败: {' '.join(cmd)}")
return result.stdout
def stop_service():
run_cmd(["systemctl", "stop", SERVICE_NAME])
def start_service():
run_cmd(["systemctl", "start", SERVICE_NAME])
def health_check(url="http://127.0.0.1:8000/health", timeout=30):
"""服务健康检查"""
import requests
deadline = time.time() + timeout
while time.time() < deadline:
try:
resp = requests.get(url, timeout=2)
if resp.status_code == 200:
print("[Deploy] 健康检查通过")
return
except Exception:
pass
time.sleep(1)
raise RuntimeError("健康检查超时")
def extract_package(package_path: Path, target_dir: Path):
"""解压部署包"""
if target_dir.exists():
raise FileExistsError(f"目标目录已存在: {target_dir}")
target_dir.mkdir(parents=True)
if package_path.suffixes[-2:] == [".tar", ".gz"] or package_path.suffix == ".tgz":
with tarfile.open(package_path, "r:gz") as tar:
tar.extractall(target_dir)
elif package_path.suffix == ".zip":
shutil.unpack_archive(str(package_path), str(target_dir))
else:
raise ValueError(f"不支持的包格式: {package_path}")
print(f"[Deploy] 包已解压至 {target_dir}")
def deploy_service(package_path: Path):
"""部署主流程"""
timestamp = time.strftime("%Y%m%d_%H%M%S")
new_release_dir = RELEASES_DIR / timestamp
# 记录当前版本,用于回滚
old_release_target = None
if CURRENT_LINK.is_symlink():
try:
old_release_target = CURRENT_LINK.resolve()
except Exception:
old_release_target = None
try:
print(f"[Deploy] 开始部署 {package_path.name}")
extract_package(package_path, new_release_dir)
print("[Deploy] 停止服务...")
stop_service()
# 切换软链接指向新版本
if CURRENT_LINK.exists() or CURRENT_LINK.is_symlink():
CURRENT_LINK.unlink()
CURRENT_LINK.symlink_to(new_release_dir)
print("[Deploy] 启动服务...")
start_service()
health_check()
print(f"[Deploy] 部署成功! 新版本目录: {new_release_dir}")
except Exception as e:
print(f"[Deploy] 部署失败: {e}")
# 尝试回滚
if old_release_target and old_release_target.exists():
try:
print("[Deploy] 尝试回滚至旧版本...")
if CURRENT_LINK.exists() or CURRENT_LINK.is_symlink():
CURRENT_LINK.unlink()
CURRENT_LINK.symlink_to(old_release_target)
start_service()
health_check()
print("[Deploy] 回滚成功")
except Exception as e2:
print(f"[Deploy] 回滚失败: {e2}")
else:
print("[Deploy] 无可用旧版本回滚,需人工介入")
raise # 重新抛出异常,便于外层记录
生产环境注意事项
- 权限管理:部署脚本的运行用户(如
deploy)需具备操作目录、管理系统服务的权限。可通过配置sudo规则精细控制。
- 配置外部化:路径、服务名、健康检查URL等应抽取到配置文件(如
config.yaml)中,避免硬编码。
- 日志记录:使用
logging模块替代print,将运行日志输出至文件,便于故障排查。
import logging
logging.basicConfig(
filename='/var/log/deploy_watcher.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
- 环境隔离:为测试、预发布、生产环境配置独立的监控目录、服务名及服务器,避免误操作。
与CI/CD流水线集成
此方案可与CI工具无缝衔接。CI流水线在完成构建、测试后,仅需将最终产物推送到服务器的监控目录即可,后续的备份、部署、验证全由自动化脚本完成。这实现了职责分离,使CI脚本更简洁,部署逻辑更集中、可控。
完整脚本示例
以下是将监控、等待、部署、日志等功能整合后的可运行脚本概览:
#!/usr/bin/env python3
import logging
import os
import time
from pathlib import Path
import requests
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import tarfile
import shutil
import subprocess
# 配置区
WATCH_DIR = "/data/deploy/packages"
APP_ROOT = Path("/app/my_service")
SERVICE_NAME = "my_service"
HEALTH_URL = "http://127.0.0.1:8000/health"
PACKAGE_SUFFIX = (".tar.gz", ".tgz", ".zip")
# 日志配置
logging.basicConfig(
filename='/var/log/deploy_watcher.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
# 此处整合上述 wait_file_stable, run_cmd, extract_package, stop_service, start_service, health_check, deploy_service 函数
# ...
class PackageHandler(FileSystemEventHandler):
def on_created(self, event):
path = Path(event.src_path)
if not path.is_file() or not any(str(path).endswith(s) for s in PACKAGE_SUFFIX):
return
logger.info("发现新包: %s", path)
try:
wait_file_stable(path)
deploy_service(path)
except Exception as e:
logger.error("处理包失败 %s: %s", path, e)
def main():
observer = Observer()
handler = PackageHandler()
observer.schedule(handler, WATCH_DIR, recursive=False)
observer.start()
logger.info("启动目录监控: %s", WATCH_DIR)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()
可将此脚本配置为systemd服务,实现开机自启与进程守护。