找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1363

积分

0

好友

185

主题
发表于 3 天前 | 查看: 6| 回复: 0

在软件部署流程中,手动操作不仅效率低下,且极易因疏忽导致线上故障。一种高效的解决思路是:通过自动化监控部署目录,实现服务包的自动发现与部署。本文将介绍如何利用Pythonwatchdog库构建一个轻量级的自动化部署监听器。

核心需求与设计思路

设想一个典型的CI/CD场景:持续集成工具(如Jenkins)将构建好的应用包(如app-1.0.0.tar.gz)推送到服务器的特定目录(如/data/deploy/packages)。我们的目标是编写一个常驻脚本,自动检测该目录下的新包,并完成以下流程:

  1. 等待文件传输完成:确保包文件完整。
  2. 备份当前版本:便于故障时快速回滚。
  3. 执行部署动作:停服务、解压新包、切换版本、启动服务。
  4. 健康检查与回滚:验证服务状态,失败则自动回退。

使用watchdog监控文件夹变化

相较于传统的轮询(os.listdir()),利用系统文件事件监听更为高效。Python的watchdog库对此进行了良好封装。

首先安装依赖:

pip install watchdog

以下是一个最小化的监听示例,用于感知新包的到来:

import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

WATCH_DIR = "/data/deploy/packages"
PACKAGE_SUFFIX = (".tar.gz", ".zip")

class PackageHandler(FileSystemEventHandler):
    def on_created(self, event):
        # event.src_path 可能是文件或目录
        path = Path(event.src_path)
        if path.is_file() and path.suffix in PACKAGE_SUFFIX:
            print(f"[Watcher] 发现新包: {path.name}")
            # 此处可接入后续部署逻辑
            # deploy_service(path)

def start_watch():
    event_handler = PackageHandler()
    observer = Observer()
    observer.schedule(event_handler, WATCH_DIR, recursive=False)
    observer.start()
    print(f"[Watcher] 开始监控目录: {WATCH_DIR}")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

if __name__ == "__main__":
    start_watch()

运行此脚本后,向监控目录放入一个.tar.gz.zip文件,控制台将输出提示信息。

关键细节:确保文件传输完成

文件通过网络传输(如SCP、Rsync)时,脚本可能在其未完成时就触发on_created事件。直接处理“半成品”文件会导致部署失败。一个可靠的策略是等待文件大小稳定一段时间。

import os
import time
from pathlib import Path

def wait_file_stable(path: Path, check_interval=1, stable_seconds=5, timeout=300):
    """等待文件大小在指定秒数内不再变化"""
    start = time.time()
    last_size = -1
    last_change_time = time.time()
    while True:
        if not path.exists():
            raise FileNotFoundError(f"{path} 已被删除")
        current_size = os.path.getsize(path)
        if current_size != last_size:
            last_size = current_size
            last_change_time = time.time()
        if time.time() - last_change_time >= stable_seconds:
            return  # 文件已稳定
        if time.time() - start > timeout:
            raise TimeoutError(f"等待文件稳定超时: {path}")
        time.sleep(check_interval)

在事件处理器中调用此函数:

class PackageHandler(FileSystemEventHandler):
    def on_created(self, event):
        path = Path(event.src_path)
        if path.is_file() and path.suffix in PACKAGE_SUFFIX:
            print(f"[Watcher] 发现新包 {path.name},等待传输完成...")
            try:
                wait_file_stable(path)
            except Exception as e:
                print(f"[Watcher] 等待文件稳定失败: {e}")
                return
            print(f"[Watcher] 包已就绪,开始部署: {path.name}")
            deploy_service(path)  # 触发部署流程

部署流程编排与实现

一个清晰、可回滚的部署目录结构至关重要。推荐如下结构:

/app/my_service/
├── current -> /app/my_service/releases/20240101_120000  # 软链接指向当前版本
├── releases/                                            # 所有历史版本
│   ├── 20231201_100000/
│   ├── 20231215_093000/
│   └── 20240101_120000/
└── logs/

每次部署的核心步骤为:

  1. 将新包解压至releases/下的新时间戳目录。
  2. current软链接指向新目录。
  3. 重启服务。

以下是核心部署函数的实现示例:

import shutil
import subprocess
import tarfile
import time
from pathlib import Path

APP_ROOT = Path("/app/my_service")
RELEASES_DIR = APP_ROOT / "releases"
CURRENT_LINK = APP_ROOT / "current"
SERVICE_NAME = "my_service"  # systemd服务名

def run_cmd(cmd: list[str]):
    """执行Shell命令"""
    print(f"[Deploy] 执行: {' '.join(cmd)}")
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"STDOUT: {result.stdout}")
        print(f"STDERR: {result.stderr}")
        raise RuntimeError(f"命令执行失败: {' '.join(cmd)}")
    return result.stdout

def stop_service():
    run_cmd(["systemctl", "stop", SERVICE_NAME])

def start_service():
    run_cmd(["systemctl", "start", SERVICE_NAME])

def health_check(url="http://127.0.0.1:8000/health", timeout=30):
    """服务健康检查"""
    import requests
    deadline = time.time() + timeout
    while time.time() < deadline:
        try:
            resp = requests.get(url, timeout=2)
            if resp.status_code == 200:
                print("[Deploy] 健康检查通过")
                return
        except Exception:
            pass
        time.sleep(1)
    raise RuntimeError("健康检查超时")

def extract_package(package_path: Path, target_dir: Path):
    """解压部署包"""
    if target_dir.exists():
        raise FileExistsError(f"目标目录已存在: {target_dir}")
    target_dir.mkdir(parents=True)
    if package_path.suffixes[-2:] == [".tar", ".gz"] or package_path.suffix == ".tgz":
        with tarfile.open(package_path, "r:gz") as tar:
            tar.extractall(target_dir)
    elif package_path.suffix == ".zip":
        shutil.unpack_archive(str(package_path), str(target_dir))
    else:
        raise ValueError(f"不支持的包格式: {package_path}")
    print(f"[Deploy] 包已解压至 {target_dir}")

def deploy_service(package_path: Path):
    """部署主流程"""
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    new_release_dir = RELEASES_DIR / timestamp

    # 记录当前版本,用于回滚
    old_release_target = None
    if CURRENT_LINK.is_symlink():
        try:
            old_release_target = CURRENT_LINK.resolve()
        except Exception:
            old_release_target = None

    try:
        print(f"[Deploy] 开始部署 {package_path.name}")
        extract_package(package_path, new_release_dir)

        print("[Deploy] 停止服务...")
        stop_service()

        # 切换软链接指向新版本
        if CURRENT_LINK.exists() or CURRENT_LINK.is_symlink():
            CURRENT_LINK.unlink()
        CURRENT_LINK.symlink_to(new_release_dir)

        print("[Deploy] 启动服务...")
        start_service()

        health_check()
        print(f"[Deploy] 部署成功! 新版本目录: {new_release_dir}")

    except Exception as e:
        print(f"[Deploy] 部署失败: {e}")
        # 尝试回滚
        if old_release_target and old_release_target.exists():
            try:
                print("[Deploy] 尝试回滚至旧版本...")
                if CURRENT_LINK.exists() or CURRENT_LINK.is_symlink():
                    CURRENT_LINK.unlink()
                CURRENT_LINK.symlink_to(old_release_target)
                start_service()
                health_check()
                print("[Deploy] 回滚成功")
            except Exception as e2:
                print(f"[Deploy] 回滚失败: {e2}")
        else:
            print("[Deploy] 无可用旧版本回滚,需人工介入")
        raise  # 重新抛出异常,便于外层记录

生产环境注意事项

  1. 权限管理:部署脚本的运行用户(如deploy)需具备操作目录、管理系统服务的权限。可通过配置sudo规则精细控制。
  2. 配置外部化:路径、服务名、健康检查URL等应抽取到配置文件(如config.yaml)中,避免硬编码。
  3. 日志记录:使用logging模块替代print,将运行日志输出至文件,便于故障排查。
    import logging
    logging.basicConfig(
    filename='/var/log/deploy_watcher.log',
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
    )
    logger = logging.getLogger(__name__)
  4. 环境隔离:为测试、预发布、生产环境配置独立的监控目录、服务名及服务器,避免误操作。

与CI/CD流水线集成

此方案可与CI工具无缝衔接。CI流水线在完成构建、测试后,仅需将最终产物推送到服务器的监控目录即可,后续的备份、部署、验证全由自动化脚本完成。这实现了职责分离,使CI脚本更简洁,部署逻辑更集中、可控。

完整脚本示例

以下是将监控、等待、部署、日志等功能整合后的可运行脚本概览:

#!/usr/bin/env python3
import logging
import os
import time
from pathlib import Path
import requests
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import tarfile
import shutil
import subprocess

# 配置区
WATCH_DIR = "/data/deploy/packages"
APP_ROOT = Path("/app/my_service")
SERVICE_NAME = "my_service"
HEALTH_URL = "http://127.0.0.1:8000/health"
PACKAGE_SUFFIX = (".tar.gz", ".tgz", ".zip")

# 日志配置
logging.basicConfig(
    filename='/var/log/deploy_watcher.log',
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)

# 此处整合上述 wait_file_stable, run_cmd, extract_package, stop_service, start_service, health_check, deploy_service 函数
# ...
class PackageHandler(FileSystemEventHandler):
    def on_created(self, event):
        path = Path(event.src_path)
        if not path.is_file() or not any(str(path).endswith(s) for s in PACKAGE_SUFFIX):
            return
        logger.info("发现新包: %s", path)
        try:
            wait_file_stable(path)
            deploy_service(path)
        except Exception as e:
            logger.error("处理包失败 %s: %s", path, e)

def main():
    observer = Observer()
    handler = PackageHandler()
    observer.schedule(handler, WATCH_DIR, recursive=False)
    observer.start()
    logger.info("启动目录监控: %s", WATCH_DIR)
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

if __name__ == "__main__":
    main()

可将此脚本配置为systemd服务,实现开机自启与进程守护。




上一篇:Orleans分布式Actor模型生产实战:.NET Core下的部署、监控与高级定制指南
下一篇:VictoriaMetrics集群部署实战:百万级指标监控的高可用与水平扩容方案
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 22:54 , Processed in 0.155806 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表