Kubernetes已成为容器编排领域的事实标准,对于构建现代化、可扩展的应用至关重要。本文将深入解析Kubernetes核心概念,并通过完整的实战配置,展示如何在生产环境中部署和管理Python应用。
Kubernetes核心概念全景图
要理解Kubernetes,首先需要了解其核心组件的工作架构:
┌──────────────────────────────────────────────────┐
│ kubectl CLI │
├──────────────────────────────────────────────────┤
│ API Server (kube-apiserver) │
├──────────────────────────────────────────────────┤
│ Controller Manager │ Scheduler │
├──────────────────────────────────────────────────┤
│ etcd (集群状态存储) │
├──────────────────────────────────────────────────┤
│ Node (kubelet + kube-proxy + container runtime) │
└──────────────────────────────────────────────────┘
核心概念解析
为了帮助Python开发者快速理解,这里将核心概念与熟悉的Python概念进行类比:
- Pod:Kubernetes最小的调度单位,包含一个或多个紧密关联的容器。可以理解为一个“进程组”。
- Deployment:用于管理Pod副本的声明式定义对象,实现应用的滚动更新和回滚,类似于一个“进程管理器”。
- Service:定义一组Pod的访问策略,提供稳定的网络端点,充当“负载均衡器”的角色。
- ConfigMap:用于存储非敏感的配置数据,可注入为环境变量或配置文件。
- Secret:用于存储敏感信息(如密码、密钥),以加密或编码形式存储。
- Namespace:提供虚拟的集群隔离,用于资源分组,类似于“虚拟环境”。
- Ingress:管理外部HTTP/HTTPS流量访问集群内Service的规则,相当于“反向代理/Nginx”。
- PersistentVolume (PV):对持久化存储的抽象,可以理解为“网络硬盘”。
- StatefulSet:用于管理有状态应用(如数据库)的工作负载对象,管理“有状态的进程组”。
Python应用Kubernetes部署实战
接下来,我们将通过一系列完整的YAML配置文件和Python代码,展示如何将一个Python Web应用部署到Kubernetes集群中。
1. 基础Deployment配置
Deployment是定义应用运行态的核心。下面是一个生产级别的Python应用Deployment配置示例,包含了资源限制、健康检查、亲和性等高级特性。
# deployment/python-app.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: python-web-app
namespace: production
labels:
app: python-web
environment: production
spec:
replicas: 3 # 副本数
revisionHistoryLimit: 5 # 保留的历史版本数
strategy:
type: RollingUpdate # 滚动更新策略
rollingUpdate:
maxSurge: 1 # 更新时最多允许超出的Pod数
maxUnavailable: 0 # 更新时最多不可用的Pod数
selector:
matchLabels:
app: python-web
tier: backend
template:
metadata:
labels:
app: python-web
tier: backend
version: "1.0.0"
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
# 亲和性规则:尽量将Pod分散到不同节点
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- python-web
topologyKey: kubernetes.io/hostname
# 容器定义
containers:
- name: python-app
image: python-web-app:1.0.0
imagePullPolicy: IfNotPresent
# 资源限制
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
# 环境变量
env:
- name: PYTHONUNBUFFERED
value: "1"
- name: ENVIRONMENT
value: "production"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
- name: REDIS_HOST
value: "redis-service"
# 端口
ports:
- containerPort: 8000
name: http
protocol: TCP
# 健康检查
livenessProbe:
httpGet:
path: /health
port: 8000
scheme: HTTP
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8000
scheme: HTTP
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
successThreshold: 1
failureThreshold: 3
# 生命周期钩子
lifecycle:
postStart:
exec:
command: ["/bin/sh", "-c", "echo '容器启动完成' > /tmp/startup.log"]
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 10"]
# 挂载点
volumeMounts:
- name: config-volume
mountPath: /app/config
- name: logs-volume
mountPath: /app/logs
- name: tmp-volume
mountPath: /tmp
# 安全上下文
securityContext:
runAsUser: 1000
runAsGroup: 1000
readOnlyRootFilesystem: true
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
# 初始化容器:等待依赖服务就绪
initContainers:
- name: init-db
image: busybox:1.28
command: ['sh', '-c', 'until nslookup postgres-service; do echo waiting for database; sleep 2; done']
# 卷定义
volumes:
- name: config-volume
configMap:
name: app-config
- name: logs-volume
emptyDir: {}
- name: tmp-volume
emptyDir: {}
# 节点选择器
nodeSelector:
node-type: app-server
# 容忍度
tolerations:
- key: "app"
operator: "Equal"
value: "python"
effect: "NoSchedule"
2. Service配置
Service为Pod提供稳定的网络访问入口。下面的配置创建了一个ClusterIP类型的服务。
# service/web-service.yaml
apiVersion: v1
kind: Service
metadata:
name: python-web-service
namespace: production
labels:
app: python-web
service: backend
spec:
selector:
app: python-web
tier: backend
# 服务类型
type: ClusterIP # 也可用 NodePort, LoadBalancer
ports:
- name: http
port: 80 # 服务端口
targetPort: 8000 # 容器端口
protocol: TCP
# NodePort类型时的端口
# nodePort: 30080
# 会话保持
sessionAffinity: ClientIP
sessionAffinityConfig:
clientIP:
timeoutSeconds: 10800
3. Ingress配置
Ingress将外部HTTP/S流量路由到集群内部的服务。以下配置使用了Nginx Ingress Controller并启用了TLS。
# ingress/web-ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: python-app-ingress
namespace: production
annotations:
# Nginx Ingress Controller注解
nginx.ingress.kubernetes.io/rewrite-target: /
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/proxy-body-size: "10m"
nginx.ingress.kubernetes.io/proxy-read-timeout: "60"
nginx.ingress.kubernetes.io/proxy-send-timeout: "60"
# 证书管理器注解
cert-manager.io/cluster-issuer: "letsencrypt-prod"
# HSTS设置
nginx.ingress.kubernetes.io/hsts: "true"
nginx.ingress.kubernetes.io/hsts-max-age: "31536000"
spec:
ingressClassName: nginx
tls:
- hosts:
- api.example.com
secretName: python-app-tls
rules:
- host: api.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: python-web-service
port:
number: 80
- path: /docs
pathType: Prefix
backend:
service:
name: python-web-service
port:
number: 80
# API版本路由
- path: /v1
pathType: Prefix
backend:
service:
name: python-web-service-v1
port:
number: 80
- path: /v2
pathType: Prefix
backend:
service:
name: python-web-service-v2
port:
number: 80
4. ConfigMap和Secret
ConfigMap和Secret用于管理应用配置和敏感信息。
# config/app-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: app-config
namespace: production
data:
# 配置文件
app_config.json: |
{
"debug": false,
"log_level": "INFO",
"cache_ttl": 300,
"database": {
"pool_size": 10,
"pool_recycle": 3600
}
}
# 环境变量配置
environment: "production"
feature_flags: |
enable_new_api=true
enable_cache=false
---
apiVersion: v1
kind: Secret
metadata:
name: app-secrets
namespace: production
type: Opaque
data:
# 使用 base64 编码的值
# echo -n 'value' | base64
database-url: cG9zdGdyZXNxbDovL3VzZXI6cGFzc0BkYi5leGFtcGxlLmNvbS9hcHA=
redis-password: cmVkaXNwYXNzMTIz
api-key: YXBpLWtleS1zZWNyZXQ=
jwt-secret: anN0LXNlY3JldC1rZXk=
5. Horizontal Pod Autoscaler (HPA)
HPA根据CPU、内存或自定义指标自动调整Pod的副本数。
# hpa/web-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: python-web-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: python-web-app
minReplicas: 2
maxReplicas: 10
# 指标
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: 100
- type: Object
object:
metric:
name: requests_per_second
describedObject:
apiVersion: networking.k8s.io/v1
kind: Ingress
name: python-app-ingress
target:
type: Value
value: 1000
# 扩缩容行为
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
- type: Pods
value: 1
periodSeconds: 60
selectPolicy: Min
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 60
- type: Pods
value: 4
periodSeconds: 60
selectPolicy: Max
6. Python应用Kubernetes感知代码
应用本身也需要具备一定的Kubernetes感知能力,例如提供健康检查端点、暴露自定义指标等。以下是一个基于FastAPI的示例。
# app/kubernetes/health.py
import os
import socket
import time
from typing import Dict, Any
from fastapi import FastAPI, Response, status
import psutil
import requests
from kubernetes import client, config
# 尝试加载Kubernetes配置
try:
config.load_incluster_config()
k8s_client = client.CoreV1Api()
IN_K8S = True
except Exception:
IN_K8S = False
k8s_client = None
app = FastAPI()
class KubernetesHealth:
"""Kubernetes健康检查"""
@staticmethod
def get_pod_info() -> Dict[str, Any]:
"""获取Pod信息"""
if not IN_K8S:
return {"in_kubernetes": False}
try:
pod_name = os.getenv("HOSTNAME", "unknown")
namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read().strip()
pod = k8s_client.read_namespaced_pod(name=pod_name, namespace=namespace)
return {
"in_kubernetes": True,
"pod_name": pod.metadata.name,
"namespace": pod.metadata.namespace,
"node_name": pod.spec.node_name,
"pod_ip": pod.status.pod_ip,
"host_ip": pod.status.host_ip,
"phase": pod.status.phase,
"start_time": pod.status.start_time.isoformat() if pod.status.start_time else None,
"labels": pod.metadata.labels,
"annotations": pod.metadata.annotations,
}
except Exception as e:
return {"error": str(e), "in_kubernetes": True}
@staticmethod
def check_readiness() -> bool:
"""就绪检查"""
# 检查数据库连接
try:
# 这里添加数据库健康检查
pass
except Exception:
return False
# 检查Redis连接
try:
# 这里添加Redis健康检查
pass
except Exception:
return False
return True
@app.get("/health")
async def health():
"""健康检查端点"""
health_data = {
"status": "healthy",
"timestamp": time.time(),
"pod_info": KubernetesHealth.get_pod_info(),
"system": {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
},
"dependencies": {
"database": "healthy", # 实际检查数据库
"redis": "healthy", # 实际检查Redis
}
}
return health_data
@app.get("/ready")
async def ready():
"""就绪检查端点"""
if KubernetesHealth.check_readiness():
return Response(status_code=status.HTTP_200_OK)
else:
return Response(
content="Not ready",
status_code=status.HTTP_503_SERVICE_UNAVAILABLE
)
@app.get("/metrics")
async def custom_metrics():
"""自定义指标端点(用于HPA)"""
# 这里可以暴露自定义指标给Prometheus
import random
from prometheus_client import generate_latest, Counter, Gauge
# 自定义指标示例
REQUESTS_TOTAL = Counter('custom_requests_total', 'Total requests')
ACTIVE_USERS = Gauge('custom_active_users', 'Active users')
REQUESTS_TOTAL.inc()
ACTIVE_USERS.set(random.randint(10, 100))
return generate_latest()
# Kubernetes事件报告
class KubernetesEventReporter:
"""Kubernetes事件报告器"""
def __init__(self):
if IN_K8S:
self.api = client.CoreV1Api()
else:
self.api = None
def report_event(self, reason: str, message: str, event_type: str = "Normal"):
"""报告事件到Kubernetes"""
if not self.api:
return
try:
pod_name = os.getenv("HOSTNAME", "unknown")
namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read().strip()
event = client.V1Event(
metadata=client.V1ObjectMeta(
generate_name=f"{pod_name}-",
namespace=namespace
),
involved_object=client.V1ObjectReference(
kind="Pod",
name=pod_name,
namespace=namespace,
api_version="v1"
),
reason=reason,
message=message,
type=event_type,
source=client.V1EventSource(component="python-app"),
first_timestamp=datetime.now(),
last_timestamp=datetime.now(),
count=1
)
self.api.create_namespaced_event(namespace=namespace, body=event)
except Exception as e:
print(f"Failed to report event: {e}")
7. 完整的Kustomization配置
Kustomize是Kubernetes原生的配置管理工具,用于组织和定制资源配置。
# kustomization.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: production
# 资源文件
resources:
- deployment/python-app.yaml
- service/web-service.yaml
- ingress/web-ingress.yaml
- config/app-config.yaml
- hpa/web-hpa.yaml
- pvc/app-pvc.yaml
# 配置生成器
configMapGenerator:
- name: app-config
files:
- config/app.properties
literals:
- ENVIRONMENT=production
- LOG_LEVEL=INFO
secretGenerator:
- name: app-secrets
type: Opaque
files:
- secrets/database.password
literals:
- API_KEY=super-secret-key
# 镜像替换
images:
- name: python-web-app
newName: registry.example.com/python-web-app
newTag: v1.2.3
# 标签和注解
commonLabels:
app: python-web
environment: production
team: backend
commonAnnotations:
description: "Python Web Application"
maintainer: "backend-team@example.com"
# 补丁
patchesStrategicMerge:
- patches/increase-memory.yaml
- patches/add-sidecar.yaml
# 变量替换
vars:
- name: APP_NAME
objref:
kind: Deployment
name: python-web-app
apiVersion: apps/v1
fieldref:
fieldpath: metadata.name
8. Python Kubernetes操作SDK
除了在集群内运行,Python应用也可以通过官方的kubernetes客户端库主动管理集群资源。
# k8s/kubernetes_client.py
from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
from typing import List, Dict, Optional
import yaml
import time
import json
class KubernetesManager:
"""Kubernetes管理客户端"""
def __init__(self, use_in_cluster=True):
"""初始化Kubernetes客户端"""
if use_in_cluster:
config.load_incluster_config()
else:
config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
self.networking_v1 = client.NetworkingV1Api()
self.autoscaling_v1 = client.AutoscalingV1Api()
self.batch_v1 = client.BatchV1Api()
def create_deployment(self, deployment_config: Dict) -> Dict:
"""创建部署"""
try:
# 从YAML文件加载或直接使用字典
if isinstance(deployment_config, str) and deployment_config.endswith('.yaml'):
with open(deployment_config, 'r') as f:
deployment_body = yaml.safe_load(f)
else:
deployment_body = deployment_config
resp = self.apps_v1.create_namespaced_deployment(
body=deployment_body,
namespace=deployment_body.get('metadata', {}).get('namespace', 'default')
)
print(f"Deployment created. status='{resp.metadata.name}'")
return resp.to_dict()
except ApiException as e:
print(f"Exception when creating Deployment: {e}")
raise
def scale_deployment(self, name: str, namespace: str, replicas: int) -> Dict:
"""扩缩容部署"""
try:
# 获取当前部署
deployment = self.apps_v1.read_namespaced_deployment(name, namespace)
# 更新副本数
deployment.spec.replicas = replicas
# 应用更新
resp = self.apps_v1.patch_namespaced_deployment(
name=name,
namespace=namespace,
body=deployment
)
print(f"Deployment scaled to {replicas} replicas")
return resp.to_dict()
except ApiException as e:
print(f"Exception when scaling Deployment: {e}")
raise
def rolling_update(self, name: str, namespace: str, image: str) -> Dict:
"""滚动更新部署"""
try:
# 获取当前部署
deployment = self.apps_v1.read_namespaced_deployment(name, namespace)
# 更新镜像
deployment.spec.template.spec.containers[0].image = image
# 添加更新注解(触发滚动更新)
if deployment.spec.template.metadata.annotations:
deployment.spec.template.metadata.annotations['kubectl.kubernetes.io/restartedAt'] = time.strftime('%Y-%m-%dT%H:%M:%SZ')
else:
deployment.spec.template.metadata.annotations = {
'kubectl.kubernetes.io/restartedAt': time.strftime('%Y-%m-%dT%H:%M:%SZ')
}
# 应用更新
resp = self.apps_v1.patch_namespaced_deployment(
name=name,
namespace=namespace,
body=deployment
)
print(f"Deployment updated with image: {image}")
return resp.to_dict()
except ApiException as e:
print(f"Exception when updating Deployment: {e}")
raise
def watch_pods(self, namespace: str, label_selector: str = None, timeout: int = 3600):
"""监控Pod状态"""
w = watch.Watch()
try:
for event in w.stream(
self.core_v1.list_namespaced_pod,
namespace=namespace,
label_selector=label_selector,
timeout_seconds=timeout
):
pod = event['object']
event_type = event['type']
print(f"{event_type}: {pod.metadata.name} ({pod.status.phase})")
# 检查Pod状态
if pod.status.phase == "Running":
print(f"Pod {pod.metadata.name} is running")
elif pod.status.phase == "Failed":
print(f"Pod {pod.metadata.name} failed: {pod.status.message}")
# 检查容器状态
for container_status in pod.status.container_statuses or []:
if container_status.state.waiting:
print(f"Container {container_status.name} waiting: {container_status.state.waiting.reason}")
elif container_status.state.terminated:
print(f"Container {container_status.name} terminated: {container_status.state.terminated.reason}")
except Exception as e:
print(f"Error watching pods: {e}")
w.stop()
def get_deployment_status(self, name: str, namespace: str) -> Dict:
"""获取部署状态"""
try:
deployment = self.apps_v1.read_namespaced_deployment(name, namespace)
status = {
"name": deployment.metadata.name,
"namespace": deployment.metadata.namespace,
"replicas": deployment.status.replicas,
"ready_replicas": deployment.status.ready_replicas,
"available_replicas": deployment.status.available_replicas,
"unavailable_replicas": deployment.status.unavailable_replicas,
"updated_replicas": deployment.status.updated_replicas,
"conditions": [
{
"type": condition.type,
"status": condition.status,
"reason": condition.reason,
"message": condition.message,
"last_update_time": condition.last_update_time.isoformat() if condition.last_update_time else None
}
for condition in deployment.status.conditions or []
],
"selector": deployment.spec.selector.match_labels,
"strategy": deployment.spec.strategy.type,
"containers": [
{
"name": container.name,
"image": container.image,
"ports": [{"container_port": port.container_port} for port in container.ports or []]
}
for container in deployment.spec.template.spec.containers
]
}
return status
except ApiException as e:
print(f"Exception when reading Deployment: {e}")
raise
def create_job(self, job_config: Dict) -> Dict:
"""创建一次性任务"""
try:
if isinstance(job_config, str) and job_config.endswith('.yaml'):
with open(job_config, 'r') as f:
job_body = yaml.safe_load(f)
else:
job_body = job_config
resp = self.batch_v1.create_namespaced_job(
body=job_body,
namespace=job_body.get('metadata', {}).get('namespace', 'default')
)
print(f"Job created: {resp.metadata.name}")
return resp.to_dict()
except ApiException as e:
print(f"Exception when creating Job: {e}")
raise
def get_cluster_metrics(self) -> Dict:
"""获取集群指标"""
try:
# 获取节点信息
nodes = self.core_v1.list_node()
# 获取命名空间资源使用情况
namespace_usage = {}
namespaces = self.core_v1.list_namespace()
for ns in namespaces.items:
pods = self.core_v1.list_namespaced_pod(namespace=ns.metadata.name)
cpu_usage = 0
memory_usage = 0
for pod in pods.items:
for container in pod.spec.containers:
if container.resources and container.resources.requests:
if 'cpu' in container.resources.requests:
# 转换CPU值(例如:250m -> 0.25)
cpu = container.resources.requests['cpu']
if cpu.endswith('m'):
cpu_usage += int(cpu[:-1]) / 1000
else:
cpu_usage += float(cpu)
if 'memory' in container.resources.requests:
# 转换内存值(例如:256Mi -> 268435456 bytes)
memory = container.resources.requests['memory']
if memory.endswith('Mi'):
memory_usage += int(memory[:-2]) * 1024 * 1024
elif memory.endswith('Gi'):
memory_usage += int(memory[:-2]) * 1024 * 1024 * 1024
namespace_usage[ns.metadata.name] = {
'cpu': cpu_usage,
'memory': memory_usage,
'pod_count': len(pods.items)
}
return {
'node_count': len(nodes.items),
'namespace_usage': namespace_usage,
'nodes': [
{
'name': node.metadata.name,
'status': node.status.conditions[-1].type if node.status.conditions else 'Unknown',
'cpu_capacity': node.status.capacity.get('cpu', '0'),
'memory_capacity': node.status.capacity.get('memory', '0'),
'pods': node.status.capacity.get('pods', '0')
}
for node in nodes.items
]
}
except ApiException as e:
print(f"Exception when getting cluster metrics: {e}")
raise
# 使用示例
if __name__ == "__main__":
k8s = KubernetesManager()
# 部署Python应用
deployment = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "python-app",
"namespace": "default",
"labels": {
"app": "python-app"
}
},
"spec": {
"replicas": 3,
"selector": {
"matchLabels": {
"app": "python-app"
}
},
"template": {
"metadata": {
"labels": {
"app": "python-app"
}
},
"spec": {
"containers": [{
"name": "python-app",
"image": "python:3.11-slim",
"command": ["python", "-m", "http.server", "8000"],
"ports": [{
"containerPort": 8000
}]
}]
}
}
}
}
result = k8s.create_deployment(deployment)
print(f"Deployment created: {result}")
9. 部署脚本和持续集成
将部署流程脚本化是DevOps实践的重要一环。以下Python脚本封装了常见的kubectl操作,便于集成到CI/CD流水线中。
# scripts/deploy_k8s.py
#!/usr/bin/env python3
"""Kubernetes部署脚本"""
import argparse
import subprocess
import sys
import yaml
import os
from pathlib import Path
import tempfile
from datetime import datetime
class KubernetesDeployer:
"""Kubernetes部署器"""
def __init__(self, kubeconfig=None, namespace="default"):
self.kubeconfig = kubeconfig
self.namespace = namespace
# 设置kubectl命令
self.kubectl_cmd = ["kubectl"]
if kubeconfig:
self.kubectl_cmd.extend(["--kubeconfig", kubeconfig])
self.kubectl_cmd.extend(["--namespace", namespace])
def apply(self, manifest_file):
"""应用Kubernetes配置"""
cmd = self.kubectl_cmd + ["apply", "-f", manifest_file]
return self._run_command(cmd)
def delete(self, manifest_file):
"""删除Kubernetes资源"""
cmd = self.kubectl_cmd + ["delete", "-f", manifest_file]
return self._run_command(cmd)
def rollout_status(self, deployment_name, timeout="300s"):
"""检查滚动更新状态"""
cmd = self.kubectl_cmd + [
"rollout", "status",
f"deployment/{deployment_name}",
f"--timeout={timeout}"
]
return self._run_command(cmd)
def set_image(self, deployment_name, container_name, image):
"""更新部署的镜像"""
cmd = self.kubectl_cmd + [
"set", "image",
f"deployment/{deployment_name}",
f"{container_name}={image}"
]
return self._run_command(cmd)
def get_pods(self, label_selector=None):
"""获取Pod列表"""
cmd = self.kubectl_cmd + ["get", "pods", "-o", "wide"]
if label_selector:
cmd.extend(["-l", label_selector])
return self._run_command(cmd)
def get_logs(self, pod_name, container_name=None, tail=100):
"""获取Pod日志"""
cmd = self.kubectl_cmd + ["logs", pod_name]
if container_name:
cmd.extend(["-c", container_name])
cmd.extend(["--tail", str(tail)])
return self._run_command(cmd)
def port_forward(self, deployment_name, local_port, container_port):
"""端口转发"""
cmd = self.kubectl_cmd + [
"port-forward",
f"deployment/{deployment_name}",
f"{local_port}:{container_port}"
]
return self._run_command(cmd, background=True)
def _run_command(self, cmd, background=False):
"""运行命令"""
print(f"Running: {' '.join(cmd)}")
if background:
return subprocess.Popen(cmd)
else:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error: {result.stderr}")
return result
def generate_k8s_manifest(app_name, image_tag, replicas=3):
"""动态生成Kubernetes配置"""
deployment = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": app_name,
"labels": {
"app": app_name,
"version": image_tag
}
},
"spec": {
"replicas": replicas,
"selector": {
"matchLabels": {
"app": app_name
}
},
"template": {
"metadata": {
"labels": {
"app": app_name,
"version": image_tag
}
},
"spec": {
"containers": [{
"name": app_name,
"image": f"registry.example.com/{app_name}:{image_tag}",
"ports": [{
"containerPort": 8000
}],
"env": [
{
"name": "ENVIRONMENT",
"value": "production"
}
],
"resources": {
"requests": {
"memory": "256Mi",
"cpu": "250m"
},
"limits": {
"memory": "512Mi",
"cpu": "500m"
}
},
"livenessProbe": {
"httpGet": {
"path": "/health",
"port": 8000
},
"initialDelaySeconds": 30,
"periodSeconds": 10
},
"readinessProbe": {
"httpGet": {
"path": "/ready",
"port": 8000
},
"initialDelaySeconds": 5,
"periodSeconds": 5
}
}]
}
}
}
}
service = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"{app_name}-service"
},
"spec": {
"selector": {
"app": app_name
},
"ports": [{
"port": 80,
"targetPort": 8000
}],
"type": "ClusterIP"
}
}
hpa = {
"apiVersion": "autoscaling/v2",
"kind": "HorizontalPodAutoscaler",
"metadata": {
"name": f"{app_name}-hpa"
},
"spec": {
"scaleTargetRef": {
"apiVersion": "apps/v1",
"kind": "Deployment",
"name": app_name
},
"minReplicas": replicas,
"maxReplicas": 10,
"metrics": [{
"type": "Resource",
"resource": {
"name": "cpu",
"target": {
"type": "Utilization",
"averageUtilization": 70
}
}
}]
}
}
# 写入临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
yaml.dump_all([deployment, service, hpa], f)
return f.name
def main():
parser = argparse.ArgumentParser(description="Kubernetes部署工具")
parser.add_argument("--app", required=True, help="应用名称")
parser.add_argument("--image", required=True, help="镜像标签")
parser.add_argument("--namespace", default="default", help="命名空间")
parser.add_argument("--replicas", type=int, default=3, help="副本数")
parser.add_argument("--action", choices=["deploy", "update", "delete"], default="deploy")
args = parser.parse_args()
# 创建部署器
deployer = KubernetesDeployer(namespace=args.namespace)
if args.action == "deploy":
# 生成配置
manifest_file = generate_k8s_manifest(
args.app,
args.image,
args.replicas
)
try:
# 应用配置
print(f"部署应用 {args.app}...")
result = deployer.apply(manifest_file)
if result.returncode == 0:
print("部署成功,等待就绪...")
deployer.rollout_status(args.app)
else:
print("部署失败")
sys.exit(1)
finally:
# 清理临时文件
os.unlink(manifest_file)
elif args.action == "update":
print(f"更新应用 {args.app} 镜像为 {args.image}...")
deployer.set_image(args.app, args.app, args.image)
deployer.rollout_status(args.app)
elif args.action == "delete":
# 生成配置用于删除
manifest_file = generate_k8s_manifest(
args.app,
args.image,
args.replicas
)
deployer.delete(manifest_file)
os.unlink(manifest_file)
if __name__ == "__main__":
main()
Kubernetes操作速查表
掌握常用的kubectl命令能极大提升运维效率:
# 基本操作
kubectl get pods # 获取Pod列表
kubectl get deployments # 获取部署列表
kubectl get services # 获取服务列表
kubectl get ingress # 获取Ingress列表
# 部署应用
kubectl apply -f deployment.yaml # 创建/更新资源
kubectl delete -f deployment.yaml # 删除资源
# 调试和监控
kubectl logs <pod-name> # 查看Pod日志
kubectl exec -it <pod-name> bash # 进入Pod
kubectl describe pod <pod-name> # 查看Pod详情
kubectl top pods # 查看资源使用情况
# 扩缩容
kubectl scale deployment <name> --replicas=5 # 手动扩缩容
kubectl autoscale deployment <name> --min=2 --max=10 --cpu-percent=80 # 自动扩缩容
# 配置管理
kubectl create configmap <name> --from-file=config.properties # 创建ConfigMap
kubectl create secret generic <name> --from-literal=key=value # 创建Secret
# 维护操作
kubectl rollout status deployment/<name> # 查看滚动更新状态
kubectl rollout history deployment/<name> # 查看更新历史
kubectl rollout undo deployment/<name> # 回滚到上一版本
安全最佳实践
安全是生产部署不可忽视的一环。
Pod安全策略:在容器规范中定义严格的安全上下文。
# 安全上下文示例
securityContext:
runAsNonRoot: true
runAsUser: 1000
runAsGroup: 1000
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
网络策略:通过NetworkPolicy实现网络层面的微隔离。
# network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: python-app-policy
spec:
podSelector:
matchLabels:
app: python-web
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
role: internal
ports:
- protocol: TCP
port: 8000
egress:
- to:
- podSelector:
matchLabels:
role: database
ports:
- protocol: TCP
port: 5432
监控与告警
集成Prometheus实现监控指标收集。
# prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
data:
prometheus.yml: |
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
小结
Kubernetes为Python应用提供了一套完整、强大的云原生部署与管理方案,其核心价值体现在:
- 自动扩缩容:根据负载自动调整副本数,应对流量高峰。
- 自愈能力:自动重启失败的容器,保障服务连续性。
- 滚动更新:支持零停机部署新版本,提升发布效率。
- 服务发现:自动的服务注册与发现,简化微服务间调用。
- 配置管理:统一的配置和密钥管理,提升安全性。
- 存储编排:自动挂载存储系统,方便有状态应用管理。
对于Python开发者而言,深入掌握Kubernetes意味着能够构建高可用、可扩展的分布式系统,是实现现代微服务与Serverless架构的重要基石。通过本文的实战配置与代码示例,希望能为您的云原生之旅提供一份实用的参考。