找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1757

积分

0

好友

257

主题
发表于 3 天前 | 查看: 7| 回复: 0

Kubernetes已成为容器编排领域的事实标准,对于构建现代化、可扩展的应用至关重要。本文将深入解析Kubernetes核心概念,并通过完整的实战配置,展示如何在生产环境中部署和管理Python应用。

Kubernetes核心概念全景图

要理解Kubernetes,首先需要了解其核心组件的工作架构:

┌──────────────────────────────────────────────────┐
│                 kubectl CLI                       │
├──────────────────────────────────────────────────┤
│         API Server (kube-apiserver)              │
├──────────────────────────────────────────────────┤
│    Controller Manager   │   Scheduler            │
├──────────────────────────────────────────────────┤
│               etcd (集群状态存储)                 │
├──────────────────────────────────────────────────┤
│  Node (kubelet + kube-proxy + container runtime) │
└──────────────────────────────────────────────────┘

核心概念解析

为了帮助Python开发者快速理解,这里将核心概念与熟悉的Python概念进行类比:

  • Pod:Kubernetes最小的调度单位,包含一个或多个紧密关联的容器。可以理解为一个“进程组”。
  • Deployment:用于管理Pod副本的声明式定义对象,实现应用的滚动更新和回滚,类似于一个“进程管理器”。
  • Service:定义一组Pod的访问策略,提供稳定的网络端点,充当“负载均衡器”的角色。
  • ConfigMap:用于存储非敏感的配置数据,可注入为环境变量或配置文件。
  • Secret:用于存储敏感信息(如密码、密钥),以加密或编码形式存储。
  • Namespace:提供虚拟的集群隔离,用于资源分组,类似于“虚拟环境”。
  • Ingress:管理外部HTTP/HTTPS流量访问集群内Service的规则,相当于“反向代理/Nginx”。
  • PersistentVolume (PV):对持久化存储的抽象,可以理解为“网络硬盘”。
  • StatefulSet:用于管理有状态应用(如数据库)的工作负载对象,管理“有状态的进程组”。

Python应用Kubernetes部署实战

接下来,我们将通过一系列完整的YAML配置文件和Python代码,展示如何将一个Python Web应用部署到Kubernetes集群中。

1. 基础Deployment配置

Deployment是定义应用运行态的核心。下面是一个生产级别的Python应用Deployment配置示例,包含了资源限制、健康检查、亲和性等高级特性。

# deployment/python-app.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: python-web-app
  namespace: production
  labels:
    app: python-web
    environment: production
spec:
  replicas: 3 # 副本数
  revisionHistoryLimit: 5 # 保留的历史版本数
  strategy:
    type: RollingUpdate # 滚动更新策略
    rollingUpdate:
      maxSurge: 1        # 更新时最多允许超出的Pod数
      maxUnavailable: 0  # 更新时最多不可用的Pod数
  selector:
    matchLabels:
      app: python-web
      tier: backend
  template:
    metadata:
      labels:
        app: python-web
        tier: backend
        version: "1.0.0"
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      # 亲和性规则:尽量将Pod分散到不同节点
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - python-web
              topologyKey: kubernetes.io/hostname
      # 容器定义
      containers:
      - name: python-app
        image: python-web-app:1.0.0
        imagePullPolicy: IfNotPresent
        # 资源限制
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        # 环境变量
        env:
        - name: PYTHONUNBUFFERED
          value: "1"
        - name: ENVIRONMENT
          value: "production"
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: database-url
        - name: REDIS_HOST
          value: "redis-service"
        # 端口
        ports:
        - containerPort: 8000
          name: http
          protocol: TCP
        # 健康检查
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
            scheme: HTTP
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
            scheme: HTTP
          initialDelaySeconds: 5
          periodSeconds: 5
          timeoutSeconds: 3
          successThreshold: 1
          failureThreshold: 3
        # 生命周期钩子
        lifecycle:
          postStart:
            exec:
              command: ["/bin/sh", "-c", "echo '容器启动完成' > /tmp/startup.log"]
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 10"]
        # 挂载点
        volumeMounts:
        - name: config-volume
          mountPath: /app/config
        - name: logs-volume
          mountPath: /app/logs
        - name: tmp-volume
          mountPath: /tmp
        # 安全上下文
        securityContext:
          runAsUser: 1000
          runAsGroup: 1000
          readOnlyRootFilesystem: true
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - ALL
      # 初始化容器:等待依赖服务就绪
      initContainers:
      - name: init-db
        image: busybox:1.28
        command: ['sh', '-c', 'until nslookup postgres-service; do echo waiting for database; sleep 2; done']
      # 卷定义
      volumes:
      - name: config-volume
        configMap:
          name: app-config
      - name: logs-volume
        emptyDir: {}
      - name: tmp-volume
        emptyDir: {}
      # 节点选择器
      nodeSelector:
        node-type: app-server
      # 容忍度
      tolerations:
      - key: "app"
        operator: "Equal"
        value: "python"
        effect: "NoSchedule"
2. Service配置

Service为Pod提供稳定的网络访问入口。下面的配置创建了一个ClusterIP类型的服务。

# service/web-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: python-web-service
  namespace: production
  labels:
    app: python-web
    service: backend
spec:
  selector:
    app: python-web
    tier: backend
  # 服务类型
  type: ClusterIP # 也可用 NodePort, LoadBalancer
  ports:
  - name: http
    port: 80           # 服务端口
    targetPort: 8000   # 容器端口
    protocol: TCP
    # NodePort类型时的端口
    # nodePort: 30080
  # 会话保持
  sessionAffinity: ClientIP
  sessionAffinityConfig:
    clientIP:
      timeoutSeconds: 10800
3. Ingress配置

Ingress将外部HTTP/S流量路由到集群内部的服务。以下配置使用了Nginx Ingress Controller并启用了TLS。

# ingress/web-ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: python-app-ingress
  namespace: production
  annotations:
    # Nginx Ingress Controller注解
    nginx.ingress.kubernetes.io/rewrite-target: /
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    nginx.ingress.kubernetes.io/proxy-body-size: "10m"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "60"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "60"
    # 证书管理器注解
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
    # HSTS设置
    nginx.ingress.kubernetes.io/hsts: "true"
    nginx.ingress.kubernetes.io/hsts-max-age: "31536000"
spec:
  ingressClassName: nginx
  tls:
  - hosts:
    - api.example.com
    secretName: python-app-tls
  rules:
  - host: api.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: python-web-service
            port:
              number: 80
      - path: /docs
        pathType: Prefix
        backend:
          service:
            name: python-web-service
            port:
              number: 80
      # API版本路由
      - path: /v1
        pathType: Prefix
        backend:
          service:
            name: python-web-service-v1
            port:
              number: 80
      - path: /v2
        pathType: Prefix
        backend:
          service:
            name: python-web-service-v2
            port:
              number: 80
4. ConfigMap和Secret

ConfigMap和Secret用于管理应用配置和敏感信息。

# config/app-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: app-config
  namespace: production
data:
  # 配置文件
  app_config.json: |
    {
      "debug": false,
      "log_level": "INFO",
      "cache_ttl": 300,
      "database": {
        "pool_size": 10,
        "pool_recycle": 3600
      }
    }
  # 环境变量配置
  environment: "production"
  feature_flags: |
    enable_new_api=true
    enable_cache=false
---
apiVersion: v1
kind: Secret
metadata:
  name: app-secrets
  namespace: production
type: Opaque
data:
  # 使用 base64 编码的值
  # echo -n 'value' | base64
  database-url: cG9zdGdyZXNxbDovL3VzZXI6cGFzc0BkYi5leGFtcGxlLmNvbS9hcHA=
  redis-password: cmVkaXNwYXNzMTIz
  api-key: YXBpLWtleS1zZWNyZXQ=
  jwt-secret: anN0LXNlY3JldC1rZXk=
5. Horizontal Pod Autoscaler (HPA)

HPA根据CPU、内存或自定义指标自动调整Pod的副本数。

# hpa/web-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: python-web-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: python-web-app
  minReplicas: 2
  maxReplicas: 10
  # 指标
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: http_requests_per_second
      target:
        type: AverageValue
        averageValue: 100
  - type: Object
    object:
      metric:
        name: requests_per_second
      describedObject:
        apiVersion: networking.k8s.io/v1
        kind: Ingress
        name: python-app-ingress
      target:
        type: Value
        value: 1000
  # 扩缩容行为
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60
      - type: Pods
        value: 1
        periodSeconds: 60
      selectPolicy: Min
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60
      - type: Pods
        value: 4
        periodSeconds: 60
      selectPolicy: Max
6. Python应用Kubernetes感知代码

应用本身也需要具备一定的Kubernetes感知能力,例如提供健康检查端点、暴露自定义指标等。以下是一个基于FastAPI的示例。

# app/kubernetes/health.py
import os
import socket
import time
from typing import Dict, Any
from fastapi import FastAPI, Response, status
import psutil
import requests
from kubernetes import client, config

# 尝试加载Kubernetes配置
try:
    config.load_incluster_config()
    k8s_client = client.CoreV1Api()
    IN_K8S = True
except Exception:
    IN_K8S = False
    k8s_client = None

app = FastAPI()

class KubernetesHealth:
    """Kubernetes健康检查"""
    @staticmethod
    def get_pod_info() -> Dict[str, Any]:
        """获取Pod信息"""
        if not IN_K8S:
            return {"in_kubernetes": False}
        try:
            pod_name = os.getenv("HOSTNAME", "unknown")
            namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read().strip()
            pod = k8s_client.read_namespaced_pod(name=pod_name, namespace=namespace)
            return {
                "in_kubernetes": True,
                "pod_name": pod.metadata.name,
                "namespace": pod.metadata.namespace,
                "node_name": pod.spec.node_name,
                "pod_ip": pod.status.pod_ip,
                "host_ip": pod.status.host_ip,
                "phase": pod.status.phase,
                "start_time": pod.status.start_time.isoformat() if pod.status.start_time else None,
                "labels": pod.metadata.labels,
                "annotations": pod.metadata.annotations,
            }
        except Exception as e:
            return {"error": str(e), "in_kubernetes": True}

    @staticmethod
    def check_readiness() -> bool:
        """就绪检查"""
        # 检查数据库连接
        try:
            # 这里添加数据库健康检查
            pass
        except Exception:
            return False
        # 检查Redis连接
        try:
            # 这里添加Redis健康检查
            pass
        except Exception:
            return False
        return True

@app.get("/health")
async def health():
    """健康检查端点"""
    health_data = {
        "status": "healthy",
        "timestamp": time.time(),
        "pod_info": KubernetesHealth.get_pod_info(),
        "system": {
            "cpu_percent": psutil.cpu_percent(),
            "memory_percent": psutil.virtual_memory().percent,
            "disk_percent": psutil.disk_usage('/').percent,
        },
        "dependencies": {
            "database": "healthy", # 实际检查数据库
            "redis": "healthy",    # 实际检查Redis
        }
    }
    return health_data

@app.get("/ready")
async def ready():
    """就绪检查端点"""
    if KubernetesHealth.check_readiness():
        return Response(status_code=status.HTTP_200_OK)
    else:
        return Response(
            content="Not ready",
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE
        )

@app.get("/metrics")
async def custom_metrics():
    """自定义指标端点(用于HPA)"""
    # 这里可以暴露自定义指标给Prometheus
    import random
    from prometheus_client import generate_latest, Counter, Gauge
    # 自定义指标示例
    REQUESTS_TOTAL = Counter('custom_requests_total', 'Total requests')
    ACTIVE_USERS = Gauge('custom_active_users', 'Active users')
    REQUESTS_TOTAL.inc()
    ACTIVE_USERS.set(random.randint(10, 100))
    return generate_latest()

# Kubernetes事件报告
class KubernetesEventReporter:
    """Kubernetes事件报告器"""
    def __init__(self):
        if IN_K8S:
            self.api = client.CoreV1Api()
        else:
            self.api = None

    def report_event(self, reason: str, message: str, event_type: str = "Normal"):
        """报告事件到Kubernetes"""
        if not self.api:
            return
        try:
            pod_name = os.getenv("HOSTNAME", "unknown")
            namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read().strip()
            event = client.V1Event(
                metadata=client.V1ObjectMeta(
                    generate_name=f"{pod_name}-",
                    namespace=namespace
                ),
                involved_object=client.V1ObjectReference(
                    kind="Pod",
                    name=pod_name,
                    namespace=namespace,
                    api_version="v1"
                ),
                reason=reason,
                message=message,
                type=event_type,
                source=client.V1EventSource(component="python-app"),
                first_timestamp=datetime.now(),
                last_timestamp=datetime.now(),
                count=1
            )
            self.api.create_namespaced_event(namespace=namespace, body=event)
        except Exception as e:
            print(f"Failed to report event: {e}")
7. 完整的Kustomization配置

Kustomize是Kubernetes原生的配置管理工具,用于组织和定制资源配置。

# kustomization.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

namespace: production

# 资源文件
resources:
  - deployment/python-app.yaml
  - service/web-service.yaml
  - ingress/web-ingress.yaml
  - config/app-config.yaml
  - hpa/web-hpa.yaml
  - pvc/app-pvc.yaml

# 配置生成器
configMapGenerator:
  - name: app-config
    files:
      - config/app.properties
    literals:
      - ENVIRONMENT=production
      - LOG_LEVEL=INFO

secretGenerator:
  - name: app-secrets
    type: Opaque
    files:
      - secrets/database.password
    literals:
      - API_KEY=super-secret-key

# 镜像替换
images:
  - name: python-web-app
    newName: registry.example.com/python-web-app
    newTag: v1.2.3

# 标签和注解
commonLabels:
  app: python-web
  environment: production
  team: backend

commonAnnotations:
  description: "Python Web Application"
  maintainer: "backend-team@example.com"

# 补丁
patchesStrategicMerge:
  - patches/increase-memory.yaml
  - patches/add-sidecar.yaml

# 变量替换
vars:
  - name: APP_NAME
    objref:
      kind: Deployment
      name: python-web-app
      apiVersion: apps/v1
    fieldref:
      fieldpath: metadata.name
8. Python Kubernetes操作SDK

除了在集群内运行,Python应用也可以通过官方的kubernetes客户端库主动管理集群资源。

# k8s/kubernetes_client.py
from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
from typing import List, Dict, Optional
import yaml
import time
import json

class KubernetesManager:
    """Kubernetes管理客户端"""
    def __init__(self, use_in_cluster=True):
        """初始化Kubernetes客户端"""
        if use_in_cluster:
            config.load_incluster_config()
        else:
            config.load_kube_config()
        self.apps_v1 = client.AppsV1Api()
        self.core_v1 = client.CoreV1Api()
        self.networking_v1 = client.NetworkingV1Api()
        self.autoscaling_v1 = client.AutoscalingV1Api()
        self.batch_v1 = client.BatchV1Api()

    def create_deployment(self, deployment_config: Dict) -> Dict:
        """创建部署"""
        try:
            # 从YAML文件加载或直接使用字典
            if isinstance(deployment_config, str) and deployment_config.endswith('.yaml'):
                with open(deployment_config, 'r') as f:
                    deployment_body = yaml.safe_load(f)
            else:
                deployment_body = deployment_config
            resp = self.apps_v1.create_namespaced_deployment(
                body=deployment_body,
                namespace=deployment_body.get('metadata', {}).get('namespace', 'default')
            )
            print(f"Deployment created. status='{resp.metadata.name}'")
            return resp.to_dict()
        except ApiException as e:
            print(f"Exception when creating Deployment: {e}")
            raise

    def scale_deployment(self, name: str, namespace: str, replicas: int) -> Dict:
        """扩缩容部署"""
        try:
            # 获取当前部署
            deployment = self.apps_v1.read_namespaced_deployment(name, namespace)
            # 更新副本数
            deployment.spec.replicas = replicas
            # 应用更新
            resp = self.apps_v1.patch_namespaced_deployment(
                name=name,
                namespace=namespace,
                body=deployment
            )
            print(f"Deployment scaled to {replicas} replicas")
            return resp.to_dict()
        except ApiException as e:
            print(f"Exception when scaling Deployment: {e}")
            raise

    def rolling_update(self, name: str, namespace: str, image: str) -> Dict:
        """滚动更新部署"""
        try:
            # 获取当前部署
            deployment = self.apps_v1.read_namespaced_deployment(name, namespace)
            # 更新镜像
            deployment.spec.template.spec.containers[0].image = image
            # 添加更新注解(触发滚动更新)
            if deployment.spec.template.metadata.annotations:
                deployment.spec.template.metadata.annotations['kubectl.kubernetes.io/restartedAt'] = time.strftime('%Y-%m-%dT%H:%M:%SZ')
            else:
                deployment.spec.template.metadata.annotations = {
                    'kubectl.kubernetes.io/restartedAt': time.strftime('%Y-%m-%dT%H:%M:%SZ')
                }
            # 应用更新
            resp = self.apps_v1.patch_namespaced_deployment(
                name=name,
                namespace=namespace,
                body=deployment
            )
            print(f"Deployment updated with image: {image}")
            return resp.to_dict()
        except ApiException as e:
            print(f"Exception when updating Deployment: {e}")
            raise

    def watch_pods(self, namespace: str, label_selector: str = None, timeout: int = 3600):
        """监控Pod状态"""
        w = watch.Watch()
        try:
            for event in w.stream(
                self.core_v1.list_namespaced_pod,
                namespace=namespace,
                label_selector=label_selector,
                timeout_seconds=timeout
            ):
                pod = event['object']
                event_type = event['type']
                print(f"{event_type}: {pod.metadata.name} ({pod.status.phase})")
                # 检查Pod状态
                if pod.status.phase == "Running":
                    print(f"Pod {pod.metadata.name} is running")
                elif pod.status.phase == "Failed":
                    print(f"Pod {pod.metadata.name} failed: {pod.status.message}")
                # 检查容器状态
                for container_status in pod.status.container_statuses or []:
                    if container_status.state.waiting:
                        print(f"Container {container_status.name} waiting: {container_status.state.waiting.reason}")
                    elif container_status.state.terminated:
                        print(f"Container {container_status.name} terminated: {container_status.state.terminated.reason}")
        except Exception as e:
            print(f"Error watching pods: {e}")
            w.stop()

    def get_deployment_status(self, name: str, namespace: str) -> Dict:
        """获取部署状态"""
        try:
            deployment = self.apps_v1.read_namespaced_deployment(name, namespace)
            status = {
                "name": deployment.metadata.name,
                "namespace": deployment.metadata.namespace,
                "replicas": deployment.status.replicas,
                "ready_replicas": deployment.status.ready_replicas,
                "available_replicas": deployment.status.available_replicas,
                "unavailable_replicas": deployment.status.unavailable_replicas,
                "updated_replicas": deployment.status.updated_replicas,
                "conditions": [
                    {
                        "type": condition.type,
                        "status": condition.status,
                        "reason": condition.reason,
                        "message": condition.message,
                        "last_update_time": condition.last_update_time.isoformat() if condition.last_update_time else None
                    }
                    for condition in deployment.status.conditions or []
                ],
                "selector": deployment.spec.selector.match_labels,
                "strategy": deployment.spec.strategy.type,
                "containers": [
                    {
                        "name": container.name,
                        "image": container.image,
                        "ports": [{"container_port": port.container_port} for port in container.ports or []]
                    }
                    for container in deployment.spec.template.spec.containers
                ]
            }
            return status
        except ApiException as e:
            print(f"Exception when reading Deployment: {e}")
            raise

    def create_job(self, job_config: Dict) -> Dict:
        """创建一次性任务"""
        try:
            if isinstance(job_config, str) and job_config.endswith('.yaml'):
                with open(job_config, 'r') as f:
                    job_body = yaml.safe_load(f)
            else:
                job_body = job_config
            resp = self.batch_v1.create_namespaced_job(
                body=job_body,
                namespace=job_body.get('metadata', {}).get('namespace', 'default')
            )
            print(f"Job created: {resp.metadata.name}")
            return resp.to_dict()
        except ApiException as e:
            print(f"Exception when creating Job: {e}")
            raise

    def get_cluster_metrics(self) -> Dict:
        """获取集群指标"""
        try:
            # 获取节点信息
            nodes = self.core_v1.list_node()
            # 获取命名空间资源使用情况
            namespace_usage = {}
            namespaces = self.core_v1.list_namespace()
            for ns in namespaces.items:
                pods = self.core_v1.list_namespaced_pod(namespace=ns.metadata.name)
                cpu_usage = 0
                memory_usage = 0
                for pod in pods.items:
                    for container in pod.spec.containers:
                        if container.resources and container.resources.requests:
                            if 'cpu' in container.resources.requests:
                                # 转换CPU值(例如:250m -> 0.25)
                                cpu = container.resources.requests['cpu']
                                if cpu.endswith('m'):
                                    cpu_usage += int(cpu[:-1]) / 1000
                                else:
                                    cpu_usage += float(cpu)
                            if 'memory' in container.resources.requests:
                                # 转换内存值(例如:256Mi -> 268435456 bytes)
                                memory = container.resources.requests['memory']
                                if memory.endswith('Mi'):
                                    memory_usage += int(memory[:-2]) * 1024 * 1024
                                elif memory.endswith('Gi'):
                                    memory_usage += int(memory[:-2]) * 1024 * 1024 * 1024
                namespace_usage[ns.metadata.name] = {
                    'cpu': cpu_usage,
                    'memory': memory_usage,
                    'pod_count': len(pods.items)
                }
            return {
                'node_count': len(nodes.items),
                'namespace_usage': namespace_usage,
                'nodes': [
                    {
                        'name': node.metadata.name,
                        'status': node.status.conditions[-1].type if node.status.conditions else 'Unknown',
                        'cpu_capacity': node.status.capacity.get('cpu', '0'),
                        'memory_capacity': node.status.capacity.get('memory', '0'),
                        'pods': node.status.capacity.get('pods', '0')
                    }
                    for node in nodes.items
                ]
            }
        except ApiException as e:
            print(f"Exception when getting cluster metrics: {e}")
            raise

# 使用示例
if __name__ == "__main__":
    k8s = KubernetesManager()
    # 部署Python应用
    deployment = {
        "apiVersion": "apps/v1",
        "kind": "Deployment",
        "metadata": {
            "name": "python-app",
            "namespace": "default",
            "labels": {
                "app": "python-app"
            }
        },
        "spec": {
            "replicas": 3,
            "selector": {
                "matchLabels": {
                    "app": "python-app"
                }
            },
            "template": {
                "metadata": {
                    "labels": {
                        "app": "python-app"
                    }
                },
                "spec": {
                    "containers": [{
                        "name": "python-app",
                        "image": "python:3.11-slim",
                        "command": ["python", "-m", "http.server", "8000"],
                        "ports": [{
                            "containerPort": 8000
                        }]
                    }]
                }
            }
        }
    }
    result = k8s.create_deployment(deployment)
    print(f"Deployment created: {result}")
9. 部署脚本和持续集成

将部署流程脚本化是DevOps实践的重要一环。以下Python脚本封装了常见的kubectl操作,便于集成到CI/CD流水线中。

# scripts/deploy_k8s.py
#!/usr/bin/env python3
"""Kubernetes部署脚本"""
import argparse
import subprocess
import sys
import yaml
import os
from pathlib import Path
import tempfile
from datetime import datetime

class KubernetesDeployer:
    """Kubernetes部署器"""
    def __init__(self, kubeconfig=None, namespace="default"):
        self.kubeconfig = kubeconfig
        self.namespace = namespace
        # 设置kubectl命令
        self.kubectl_cmd = ["kubectl"]
        if kubeconfig:
            self.kubectl_cmd.extend(["--kubeconfig", kubeconfig])
        self.kubectl_cmd.extend(["--namespace", namespace])

    def apply(self, manifest_file):
        """应用Kubernetes配置"""
        cmd = self.kubectl_cmd + ["apply", "-f", manifest_file]
        return self._run_command(cmd)

    def delete(self, manifest_file):
        """删除Kubernetes资源"""
        cmd = self.kubectl_cmd + ["delete", "-f", manifest_file]
        return self._run_command(cmd)

    def rollout_status(self, deployment_name, timeout="300s"):
        """检查滚动更新状态"""
        cmd = self.kubectl_cmd + [
            "rollout", "status", 
            f"deployment/{deployment_name}",
            f"--timeout={timeout}"
        ]
        return self._run_command(cmd)

    def set_image(self, deployment_name, container_name, image):
        """更新部署的镜像"""
        cmd = self.kubectl_cmd + [
            "set", "image",
            f"deployment/{deployment_name}",
            f"{container_name}={image}"
        ]
        return self._run_command(cmd)

    def get_pods(self, label_selector=None):
        """获取Pod列表"""
        cmd = self.kubectl_cmd + ["get", "pods", "-o", "wide"]
        if label_selector:
            cmd.extend(["-l", label_selector])
        return self._run_command(cmd)

    def get_logs(self, pod_name, container_name=None, tail=100):
        """获取Pod日志"""
        cmd = self.kubectl_cmd + ["logs", pod_name]
        if container_name:
            cmd.extend(["-c", container_name])
        cmd.extend(["--tail", str(tail)])
        return self._run_command(cmd)

    def port_forward(self, deployment_name, local_port, container_port):
        """端口转发"""
        cmd = self.kubectl_cmd + [
            "port-forward",
            f"deployment/{deployment_name}",
            f"{local_port}:{container_port}"
        ]
        return self._run_command(cmd, background=True)

    def _run_command(self, cmd, background=False):
        """运行命令"""
        print(f"Running: {' '.join(cmd)}")
        if background:
            return subprocess.Popen(cmd)
        else:
            result = subprocess.run(cmd, capture_output=True, text=True)
            if result.returncode != 0:
                print(f"Error: {result.stderr}")
            return result

def generate_k8s_manifest(app_name, image_tag, replicas=3):
    """动态生成Kubernetes配置"""
    deployment = {
        "apiVersion": "apps/v1",
        "kind": "Deployment",
        "metadata": {
            "name": app_name,
            "labels": {
                "app": app_name,
                "version": image_tag
            }
        },
        "spec": {
            "replicas": replicas,
            "selector": {
                "matchLabels": {
                    "app": app_name
                }
            },
            "template": {
                "metadata": {
                    "labels": {
                        "app": app_name,
                        "version": image_tag
                    }
                },
                "spec": {
                    "containers": [{
                        "name": app_name,
                        "image": f"registry.example.com/{app_name}:{image_tag}",
                        "ports": [{
                            "containerPort": 8000
                        }],
                        "env": [
                            {
                                "name": "ENVIRONMENT",
                                "value": "production"
                            }
                        ],
                        "resources": {
                            "requests": {
                                "memory": "256Mi",
                                "cpu": "250m"
                            },
                            "limits": {
                                "memory": "512Mi",
                                "cpu": "500m"
                            }
                        },
                        "livenessProbe": {
                            "httpGet": {
                                "path": "/health",
                                "port": 8000
                            },
                            "initialDelaySeconds": 30,
                            "periodSeconds": 10
                        },
                        "readinessProbe": {
                            "httpGet": {
                                "path": "/ready",
                                "port": 8000
                            },
                            "initialDelaySeconds": 5,
                            "periodSeconds": 5
                        }
                    }]
                }
            }
        }
    }
    service = {
        "apiVersion": "v1",
        "kind": "Service",
        "metadata": {
            "name": f"{app_name}-service"
        },
        "spec": {
            "selector": {
                "app": app_name
            },
            "ports": [{
                "port": 80,
                "targetPort": 8000
            }],
            "type": "ClusterIP"
        }
    }
    hpa = {
        "apiVersion": "autoscaling/v2",
        "kind": "HorizontalPodAutoscaler",
        "metadata": {
            "name": f"{app_name}-hpa"
        },
        "spec": {
            "scaleTargetRef": {
                "apiVersion": "apps/v1",
                "kind": "Deployment",
                "name": app_name
            },
            "minReplicas": replicas,
            "maxReplicas": 10,
            "metrics": [{
                "type": "Resource",
                "resource": {
                    "name": "cpu",
                    "target": {
                        "type": "Utilization",
                        "averageUtilization": 70
                    }
                }
            }]
        }
    }
    # 写入临时文件
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
        yaml.dump_all([deployment, service, hpa], f)
        return f.name

def main():
    parser = argparse.ArgumentParser(description="Kubernetes部署工具")
    parser.add_argument("--app", required=True, help="应用名称")
    parser.add_argument("--image", required=True, help="镜像标签")
    parser.add_argument("--namespace", default="default", help="命名空间")
    parser.add_argument("--replicas", type=int, default=3, help="副本数")
    parser.add_argument("--action", choices=["deploy", "update", "delete"], default="deploy")
    args = parser.parse_args()

    # 创建部署器
    deployer = KubernetesDeployer(namespace=args.namespace)

    if args.action == "deploy":
        # 生成配置
        manifest_file = generate_k8s_manifest(
            args.app, 
            args.image, 
            args.replicas
        )
        try:
            # 应用配置
            print(f"部署应用 {args.app}...")
            result = deployer.apply(manifest_file)
            if result.returncode == 0:
                print("部署成功,等待就绪...")
                deployer.rollout_status(args.app)
            else:
                print("部署失败")
                sys.exit(1)
        finally:
            # 清理临时文件
            os.unlink(manifest_file)
    elif args.action == "update":
        print(f"更新应用 {args.app} 镜像为 {args.image}...")
        deployer.set_image(args.app, args.app, args.image)
        deployer.rollout_status(args.app)
    elif args.action == "delete":
        # 生成配置用于删除
        manifest_file = generate_k8s_manifest(
            args.app, 
            args.image, 
            args.replicas
        )
        deployer.delete(manifest_file)
        os.unlink(manifest_file)

if __name__ == "__main__":
    main()

Kubernetes操作速查表

掌握常用的kubectl命令能极大提升运维效率:

# 基本操作
kubectl get pods                  # 获取Pod列表
kubectl get deployments           # 获取部署列表
kubectl get services              # 获取服务列表
kubectl get ingress               # 获取Ingress列表

# 部署应用
kubectl apply -f deployment.yaml  # 创建/更新资源
kubectl delete -f deployment.yaml # 删除资源

# 调试和监控
kubectl logs <pod-name>           # 查看Pod日志
kubectl exec -it <pod-name> bash  # 进入Pod
kubectl describe pod <pod-name>   # 查看Pod详情
kubectl top pods                  # 查看资源使用情况

# 扩缩容
kubectl scale deployment <name> --replicas=5  # 手动扩缩容
kubectl autoscale deployment <name> --min=2 --max=10 --cpu-percent=80  # 自动扩缩容

# 配置管理
kubectl create configmap <name> --from-file=config.properties  # 创建ConfigMap
kubectl create secret generic <name> --from-literal=key=value  # 创建Secret

# 维护操作
kubectl rollout status deployment/<name>      # 查看滚动更新状态
kubectl rollout history deployment/<name>     # 查看更新历史
kubectl rollout undo deployment/<name>        # 回滚到上一版本

安全最佳实践

安全是生产部署不可忽视的一环。

Pod安全策略:在容器规范中定义严格的安全上下文。

# 安全上下文示例
securityContext:
  runAsNonRoot: true
  runAsUser: 1000
  runAsGroup: 1000
  allowPrivilegeEscalation: false
  capabilities:
    drop:
    - ALL
  seccompProfile:
    type: RuntimeDefault

网络策略:通过NetworkPolicy实现网络层面的微隔离。

# network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: python-app-policy
spec:
  podSelector:
    matchLabels:
      app: python-web
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          role: internal
    ports:
    - protocol: TCP
      port: 8000
  egress:
  - to:
    - podSelector:
        matchLabels:
          role: database
    ports:
    - protocol: TCP
      port: 5432

监控与告警

集成Prometheus实现监控指标收集。

# prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
    scrape_configs:
    - job_name: 'kubernetes-pods'
      kubernetes_sd_configs:
      - role: pod
      relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
        action: replace
        target_label: __metrics_path__
        regex: (.+)

小结

Kubernetes为Python应用提供了一套完整、强大的云原生部署与管理方案,其核心价值体现在:

  • 自动扩缩容:根据负载自动调整副本数,应对流量高峰。
  • 自愈能力:自动重启失败的容器,保障服务连续性。
  • 滚动更新:支持零停机部署新版本,提升发布效率。
  • 服务发现:自动的服务注册与发现,简化微服务间调用。
  • 配置管理:统一的配置和密钥管理,提升安全性。
  • 存储编排:自动挂载存储系统,方便有状态应用管理。

对于Python开发者而言,深入掌握Kubernetes意味着能够构建高可用、可扩展的分布式系统,是实现现代微服务与Serverless架构的重要基石。通过本文的实战配置与代码示例,希望能为您的云原生之旅提供一份实用的参考。




上一篇:MinerU开源文档提取工具实战:精准解析PDF多栏排版与公式转换
下一篇:Viper红队平台开源渗透测试实战:内置100+模块与AI助手,开箱即用
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 21:11 , Processed in 0.370821 second(s), 38 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表