需求场景
在云原生架构中,自动扩缩容是保障应用弹性的关键。常见的有根据CPU、内存指标进行扩缩容的 HPA,但在某些场景下,我们需要更复杂的规则。例如,一个订阅了多个 Kafka Topic 的服务,我们希望根据不同 Topic 的消息堆积情况(Lag)来动态调整该服务的副本数,并设置扩缩容的上下限。
本文将通过一个具体案例,手把手教你从零开始开发一个自定义的 Kubernetes Operator,实现基于 Kafka 消费者组延迟的自动扩缩容功能。
一、 基础环境准备
在开始之前,你需要具备一些基础条件:了解 Kubernetes 的基本概念,掌握 Go 语言的基础知识,并准备好开发环境。
a. 安装 operator-sdk
如果你的电脑上没有 operator-sdk 工具,需要先进行安装。以下是 macOS 的安装方式,其他系统请参考 官方安装指南:
brew install operator-sdk
安装完成后,可以查看一下版本:
$ operator-sdk version
>>> operator-sdk version: “v1.36.1”, commit: “37d2f2872bfecd6927469f384be4951805aa4caa”, kubernetes version: “v1.29.0”, go version: “go1.22.6”, GOOS: “darwin”, GOARCH: “arm64”
b. 在空目录中初始化项目
创建一个空目录,并使用 operator-sdk init 命令初始化项目:
operator-sdk init --domain=xxx.net --repo=mygitlab.xxx.net/Operation/hex-server-autoscalers-operator
--domain=xxx.net:定义你的自定义资源所属的域名,可以根据需要更改。
--repo=...:定义项目的 Go 模块路径。
c. 使用 operator-sdk 创建 API 和控制器
接下来,创建我们自定义资源的 API 定义和对应的控制器:
operator-sdk create api --group=autoscaling --version=v1alpha1 --kind=HexServerAutoscaler --resource --controller
--group=autoscaling:定义 CRD 所在的 API 组。
--version=v1alpha1:定义 CRD 的版本。
--kind=HexServerAutoscaler:定义 CRD 的资源类型名称。
--resource:指示命令生成 CRD 定义。
--controller:指示命令生成控制器代码。
至此,基础环境已经准备完毕。
二、 完成代码逻辑与配置调整
环境搭建好后,核心工作就是编写业务逻辑。这主要包括定义资源结构(Spec)和实现控制器的协调逻辑(Reconcile)。
a. 首先,修改 CRD 定义
创建的 API 资源文件位于 api/v1alpha1 目录下。打开 hexserverautoscaler_types.go 文件,根据我们的需求定义 CRD 结构。
我们需要定义扩缩容的目标、Kafka Topic 阈值等字段:
// ScaleTargetRef 用于指定扩缩容的目标对象
type ScaleTargetRef struct {
ApiVersion string `json:“apiVersion”`
Kind string `json:“kind”`
Name string `json:“name”`
}
// TopicThreshold 定义每个 topic 的阈值配置
type TopicThreshold struct {
Topic string `json:“topic”`
Lag int32 `json:“lag”`
}
// Consumergroup 消费者组及组内的topic
type Consumergroup struct {
Name string `json:“name”`
TopicsThreshold []TopicThreshold `json:“topicsThreshold”`
}
// HexServerAutoscalerSpec defines the desired state of HexServerAutoscaler
type HexServerAutoscalerSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run “make” to regenerate code after modifying this file
Consumergroups []Consumergroup `json:“consumergroups”`
MaxReplicas int32 `json:“maxReplicas”`
MinReplicas int32 `json:“minReplicas”`
//ScaleUpReplicas int32 `json:“scaleUpReplicas”`
ScaleTargetRef ScaleTargetRef `json:“scaleTargetRef”`
}
...
b. 接下来,实现控制器的核心逻辑
控制器代码位于 internal/controller/hexserverautoscaler_controller.go 文件中。你需要在 Reconcile 方法中编写逻辑,监控 Prometheus 中的 Kafka 指标,并根据配置的阈值进行扩缩容决策。
以下是一个关键的实现示例:
func (r *HexServerAutoscalerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
// 获取 HexServerAutoscaler 实例
var autoscaler autoscalingv1alpha1.HexServerAutoscaler
if err := r.Get(ctx, req.NamespacedName, &autoscaler); err != nil {
return ctrResult, client.IgnoreNotFound(err)
}
// 遍历所有的 topic,并进行监控和扩容检查
for _, consumergroup := range autoscaler.Spec.Consumergroups {
for _, topicThreshold := range consumergroup.TopicsThreshold {
currentMetric := queryPrometheus(consumergroup.Name, topicThreshold.Topic)
scaled, err := r.scaleResource(autoscaler.Namespace, autoscaler.Spec, currentMetric, topicThreshold.Lag)
if err != nil {
return ctrResult, err
}
if scaled {
return ctrResult, err
}
}
}
// 每隔 60 秒重新调度 Reconcile return ctrResult, nil
}
// queryPrometheus 从Prometheus查询指定消费者组和Topic的延迟
func queryPrometheus(consumergroup string, topic string) int32 {
// 创建 Prometheus API 客户端
promClient, err := api.NewClient(api.Config{
Address: “http://1.1.1.1:9090”,
})
if err != nil {
klog.Error(err, “unable to create prometheus client”)
return 0
}
// 创建一个新的 Prometheus V1 API 实例
v1api := promv1.NewAPI(promClient)
ctx := context.Background()
// 定义查询语句
//query := fmt.Sprintf(‘sum(kafka_consumergroup_lag{group=“%s”,topic=“%s”}) by (consumergroup,topic,job,env,group,app,instance) > 10 and sum by (consumergroup,topic,job,env,group,app,instance) (increase(kafka_consumergroup_current_offset{group=“%s”,topic=“%s”}[5m])) > 10’, consumergroup, topic, consumergroup, topic)
query := fmt.Sprintf(‘sum(kafka_consumergroup_lag{group=“dbroute”,consumergroup=“%s”,topic=“%s”}) by (consumergroup,topic,job,env,group,app,instance)’, consumergroup, topic)
// 执行查询
result, warnings, err := v1api.Query(ctx, query, model.Now().Time())
if err != nil {
klog.Error(err, “unable to query Prometheus”)
return 0
}
if len(warnings) > 0 {
klog.Warningf(“Prometheus query returned warnings: %v\n”, warnings)
}
// 处理查询结果
// 假设查询返回的结果是一个 scalar 类型
value := result.(model.Vector)[0].Value
intValue := int32(value)
return intValue
}
type ScalingParams struct {
CurrentReplicas int32
CurrentMetric int32
DesiredMetric int32
MinReplicas int32
MaxReplicas int32
}
func (r *HexServerAutoscalerReconciler) scaleResource(namespace string, spec autoscalingv1alpha1.HexServerAutoscalerSpec, currentMetric, desiredMetric int32) (scaled bool, err error) {
ctx := context.Background()
switch spec.ScaleTargetRef.Kind {
case “Deployment”:
// 获取目标 Deployment 对象
var deployment appsv1.Deployment
err := r.Client.Get(ctx, client.ObjectKey{
Namespace: namespace,
Name: spec.ScaleTargetRef.Name,
}, &deployment)
if err != nil {
return false, err
}
currentReplicas := *deployment.Spec.Replicas
scaleParams := ScalingParams{
CurrentReplicas: currentReplicas,
CurrentMetric: currentMetric,
DesiredMetric: desiredMetric,
MinReplicas: spec.MinReplicas,
MaxReplicas: spec.MaxReplicas,
}
desiredReplicas := calculateDesiredReplicas(scaleParams)
if desiredReplicas == currentReplicas {
return false, nil
}
// 更新 Deployment 的副本数量
deployment.Spec.Replicas = &desiredReplicas
err = r.Client.Update(ctx, &deployment)
return true, err
// 可以在这里添加对其他资源类型(如 StatefulSet)的支持
default:
return false, fmt.Errorf(“unsupported target kind: %s”, spec.ScaleTargetRef.Kind)
}
}
const (
scaleUpFactor = 0.8
scaleDownFactor = 0.5
)
// calculateDesiredReplicas 计算伸缩后的副本数
func calculateDesiredReplicas(params ScalingParams) int32 {
// 计算未经调整的期望副本数
rawDesiredReplicas := float64(params.CurrentReplicas * (params.CurrentMetric / params.DesiredMetric))
// 应用扩缩容因子
var desiredReplicas float64
if rawDesiredReplicas > float64(params.CurrentReplicas) {
// 扩容
desiredReplicas = float64(params.CurrentReplicas) +
math.Ceil((rawDesiredReplicas-float64(params.CurrentReplicas))*scaleUpFactor)
} else {
// 缩容
desiredReplicas = float64(params.CurrentReplicas) -
math.Floor((float64(params.CurrentReplicas)-rawDesiredReplicas)*scaleDownFactor)
}
// 确保结果在最小和最大副本数之间
desiredReplicas = math.Max(float64(params.MinReplicas), desiredReplicas)
desiredReplicas = math.Min(float64(params.MaxReplicas), desiredReplicas)
return int32(math.Round(desiredReplicas))
}
...
代码的核心逻辑就写完了。接下来需要对部署配置进行一些调整,准备发布上线。
三、 调整部署配置
a. 修改项目部署的命名空间
编辑 config/default/kustomization.yaml 文件,修改默认的命名空间:
namespace: controller-manager-system
b. 修改资源名称前缀
自动生成的资源名称前缀可能过长,加上资源本身的名字可能会超过 Kubernetes 资源名称 63 个字符的限制。编辑 config/default/kustomization.yaml:
namePrefix: server-autoscalers-
c. 修改 deployment 的补丁文件
需要修改镜像地址并添加拉取密钥,同时保留 kube-rbac-proxy 容器。编辑 config/default/manager_auth_proxy_patch.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
namespace: controller-manager-system
spec:
template:
spec:
containers:
- name: kube-rbac-proxy
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- “ALL”
image: swr.cn-north-4.myhuaweicloud.com/hex-dev/kubebuilder-kube-rbac-proxy:v0.16.0
args:
- “--secure-listen-address=0.0.0.0:8443”
- “--upstream=http://127.0.0.1:8080/”
- “--logtostderr=true”
- “--v=0”
ports:
- containerPort: 8443
protocol: TCP
name: https
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 5m
memory: 64Mi
- name: manager
args:
- “--health-probe-bind-address=:8081”
- “--metrics-bind-address=127.0.0.1:8080”
- “--leader-elect”
imagePullSecrets:
- name: hwy-swr
d. 修改主部署文件中的镜像地址,并添加镜像仓库密钥
编辑 config/manager/manager.yaml:
apiVersion: v1
kind: Namespace
metadata:
labels:
control-plane: controller-manager
app.kubernetes.io/name: hex-server-autoscalers-operator
app.kubernetes.io/managed-by: kustomize
name: controller-manager-system
---
kind: Secret
apiVersion: v1
metadata:
name: hwy-swr
namespace: controller-manager-system
data:
.dockerconfigjson: 你的config...==
type: kubernetes.io/dockerconfigjson
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
namespace: controller-manager-system
labels:
control-plane: controller-manager
app.kubernetes.io/name: hex-server-autoscalers-operator
app.kubernetes.io/managed-by: kustomize
spec:
selector:
matchLabels:
control-plane: controller-manager
replicas: 1
template:
metadata:
annotations:
kubectl.kubernetes.io/default-container: manager
labels:
control-plane: controller-manager
spec:
securityContext:
runAsNonRoot: true
containers:
- command:
- /manager
args:
- --leader-elect
image: swr.cn-north-4.myhuaweicloud.com/hex-dev/hex-server-autoscalers-operator:1.0.1
name: manager
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- “ALL”
livenessProbe:
httpGet:
path: /healthz
port: 8081
initialDelaySeconds: 15
periodSeconds: 20
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
# TODO(user): Configure the resources accordingly based on the project requirements.
# More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
resources:
limits:
cpu: 500m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
imagePullSecrets:
- name: hwy-swr
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
e. 修改 Makefile
首先修改为私有镜像仓库的地址:
IMG ?= swr.cn-north-4.myhuaweicloud.com/hex-dev/hex-server-autoscalers-operator:1.0.1
如果你的开发机是 macOS M 芯片,构建 Linux 镜像时需要指定平台:
docker-build: ## Build docker image with the manager.
$(CONTAINER_TOOL) build --platform linux/amd64 -t ${IMG}
然后修改 make deploy 时用于更新镜像的命令:
edit set image swr.cn-north-4.myhuaweicloud.com/hex-dev/hex-server-autoscalers-operator=${IMG}
f. 修改 Dockerfile 中的镜像 CPU 架构
同样,为了在 Mac M 芯片上构建能在 Linux 集群运行的镜像,需要修改 Dockerfile 指定构建平台:
# Build the manager binary
FROM --platform=linux/amd64 golang:1.21 AS builder
ARG TARGETOS
ARG TARGETARCH
WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don‘t need to re-download as much
# and so that source changes don’t invalidate our downloaded layer
RUN go mod download
# Copy the go source
COPY cmd/main.go cmd/main.go
COPY api/ api/
COPY internal/controller/ internal/controller/
# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
# was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO
# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore,
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go
# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM --platform=linux/amd64 gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532
ENTRYPOINT [“/manager”]
四、 部署 Operator
配置调整完毕,现在可以开始部署我们编写的 Operator 了。
a. 生成 CRD 和代码
完成 CRD 和控制器的修改后,需要生成 Kubernetes 需要的 CRD 文件和相关代码。执行以下命令:
make generate
make manifests
b. 生成并推送 Docker 镜像
使用 Makefile 构建并推送镜像到你的镜像仓库:
make docker-build docker-push
c. 在集群中安装 CRD
将我们定义的自定义资源(HexServerAutoscaler)安装到 Kubernetes 集群:
make install
d. 预览将要部署的资源文件
在正式部署前,可以先预览一下 Kustomize 生成的最终 YAML 文件:
kubectl kustomize config/default
e. 部署 Operator 控制器
最后,将 Operator 的控制器部署到集群中:
make deploy
至此,Operator 的开发和部署工作全部完成。接下来我们可以创建一个自定义资源实例来测试它的效果。
五、 测试效果
a. 编辑自定义资源示例文件
编辑 config/samples/autoscaling_v1alpha1_hexserverautoscaler.yaml,创建一个具体的扩缩容规则:
apiVersion: autoscaling.hexfuture.net/v1alpha1
kind: HexServerAutoscaler
metadata:
labels:
app.kubernetes.io/name: hex-server-autoscalers-operator
app.kubernetes.io/managed-by: kustomize
name: hexserverautoscaler-sample
namespace: b-subs
spec:
maxReplicas: 10
minReplicas: 2
consumergroups:
- name: group1
topicsThreshold:
- topic: test_topic_lag1
lag: 10
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: accuratescanhte
这个配置意味着:监控名为 group1 的消费者组在 test_topic_lag1 这个 Topic 上的延迟,如果延迟超过 10 条消息,就会对命名空间 b-subs 中名为 accuratescanhte 的 Deployment 进行自动扩缩容,副本数范围在 2 到 10 之间。
b. 部署自定义资源
将上述配置应用到 Kubernetes 集群:
kubectl apply -f config/samples/autoscaling_v1alpha1_hexserverautoscaler.yaml
现在,Operator 就会开始工作。它会周期性地查询 Prometheus 中指定消费者组和 Topic 的延迟指标,并根据我们定义的阈值和算法,自动调整目标 Deployment 的副本数量,从而实现基于业务消息堆积情况的精细化弹性伸缩。
总结
通过以上五个步骤,我们完成了一个功能完整的自定义 Kubernetes Operator 的开发、配置和部署。这个 Operator 扩展了 Kubernetes 的原生能力,使其能够理解我们自定义的 HexServerAutoscaler 资源,并根据 Kafka 的实时业务指标做出扩缩容决策。
这种模式非常灵活,你可以基于这个框架,轻松修改数据源(如换成其他监控系统)、调整扩缩容算法或支持更多类型的资源(如 StatefulSet),以满足各种复杂的自动化运维场景。在 云原生/IaaS 领域,掌握 Operator 开发能力,意味着你能为 Kubernetes 生态量身打造专属的自动化管理工具,极大地提升运维效率和系统弹性。