找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

4173

积分

0

好友

548

主题
发表于 1 小时前 | 查看: 3| 回复: 0

需求场景

在云原生架构中,自动扩缩容是保障应用弹性的关键。常见的有根据CPU、内存指标进行扩缩容的 HPA,但在某些场景下,我们需要更复杂的规则。例如,一个订阅了多个 Kafka Topic 的服务,我们希望根据不同 Topic 的消息堆积情况(Lag)来动态调整该服务的副本数,并设置扩缩容的上下限。

本文将通过一个具体案例,手把手教你从零开始开发一个自定义的 Kubernetes Operator,实现基于 Kafka 消费者组延迟的自动扩缩容功能。

一、 基础环境准备

在开始之前,你需要具备一些基础条件:了解 Kubernetes 的基本概念,掌握 Go 语言的基础知识,并准备好开发环境。

a. 安装 operator-sdk

如果你的电脑上没有 operator-sdk 工具,需要先进行安装。以下是 macOS 的安装方式,其他系统请参考 官方安装指南

brew install operator-sdk

安装完成后,可以查看一下版本:

$ operator-sdk version
>>> operator-sdk version: “v1.36.1”, commit: “37d2f2872bfecd6927469f384be4951805aa4caa”, kubernetes version: “v1.29.0”, go version: “go1.22.6”, GOOS: “darwin”, GOARCH: “arm64”

b. 在空目录中初始化项目

创建一个空目录,并使用 operator-sdk init 命令初始化项目:

operator-sdk init --domain=xxx.net --repo=mygitlab.xxx.net/Operation/hex-server-autoscalers-operator
  • --domain=xxx.net:定义你的自定义资源所属的域名,可以根据需要更改。
  • --repo=...:定义项目的 Go 模块路径。

c. 使用 operator-sdk 创建 API 和控制器

接下来,创建我们自定义资源的 API 定义和对应的控制器:

operator-sdk create api --group=autoscaling --version=v1alpha1 --kind=HexServerAutoscaler --resource --controller
  • --group=autoscaling:定义 CRD 所在的 API 组。
  • --version=v1alpha1:定义 CRD 的版本。
  • --kind=HexServerAutoscaler:定义 CRD 的资源类型名称。
  • --resource:指示命令生成 CRD 定义。
  • --controller:指示命令生成控制器代码。

至此,基础环境已经准备完毕。

二、 完成代码逻辑与配置调整

环境搭建好后,核心工作就是编写业务逻辑。这主要包括定义资源结构(Spec)和实现控制器的协调逻辑(Reconcile)。

a. 首先,修改 CRD 定义

创建的 API 资源文件位于 api/v1alpha1 目录下。打开 hexserverautoscaler_types.go 文件,根据我们的需求定义 CRD 结构。

我们需要定义扩缩容的目标、Kafka Topic 阈值等字段:

// ScaleTargetRef 用于指定扩缩容的目标对象
type ScaleTargetRef struct {
    ApiVersion string `json:“apiVersion”`
    Kind       string `json:“kind”`
    Name       string `json:“name”`
}

// TopicThreshold 定义每个 topic 的阈值配置
type TopicThreshold struct {
    Topic string `json:“topic”`
    Lag   int32  `json:“lag”`
}

// Consumergroup 消费者组及组内的topic
type Consumergroup struct {
    Name            string           `json:“name”`
    TopicsThreshold []TopicThreshold `json:“topicsThreshold”`
}

// HexServerAutoscalerSpec defines the desired state of HexServerAutoscaler
type HexServerAutoscalerSpec struct {
    // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
    // Important: Run “make” to regenerate code after modifying this file
    Consumergroups []Consumergroup `json:“consumergroups”`
    MaxReplicas    int32           `json:“maxReplicas”`
    MinReplicas    int32           `json:“minReplicas”`
    //ScaleUpReplicas int32           `json:“scaleUpReplicas”`
    ScaleTargetRef ScaleTargetRef `json:“scaleTargetRef”`
}
...

b. 接下来,实现控制器的核心逻辑

控制器代码位于 internal/controller/hexserverautoscaler_controller.go 文件中。你需要在 Reconcile 方法中编写逻辑,监控 Prometheus 中的 Kafka 指标,并根据配置的阈值进行扩缩容决策。

以下是一个关键的实现示例:

func (r *HexServerAutoscalerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    _ = log.FromContext(ctx)
    // 获取 HexServerAutoscaler 实例
    var autoscaler autoscalingv1alpha1.HexServerAutoscaler
    if err := r.Get(ctx, req.NamespacedName, &autoscaler); err != nil {
       return ctrResult, client.IgnoreNotFound(err)
    }

    // 遍历所有的 topic,并进行监控和扩容检查
    for _, consumergroup := range autoscaler.Spec.Consumergroups {
       for _, topicThreshold := range consumergroup.TopicsThreshold {
          currentMetric := queryPrometheus(consumergroup.Name, topicThreshold.Topic)
          scaled, err := r.scaleResource(autoscaler.Namespace, autoscaler.Spec, currentMetric, topicThreshold.Lag)
          if err != nil {
             return ctrResult, err
          }
          if scaled {
             return ctrResult, err
          }
       }
    }

    // 每隔 60 秒重新调度 Reconcile    return ctrResult, nil
}

// queryPrometheus 从Prometheus查询指定消费者组和Topic的延迟
func queryPrometheus(consumergroup string, topic string) int32 {
    // 创建 Prometheus API 客户端
    promClient, err := api.NewClient(api.Config{
       Address: “http://1.1.1.1:9090”,
    })
    if err != nil {
       klog.Error(err, “unable to create prometheus client”)
       return 0
    }

    // 创建一个新的 Prometheus V1 API 实例
    v1api := promv1.NewAPI(promClient)
    ctx := context.Background()

    // 定义查询语句
    //query := fmt.Sprintf(‘sum(kafka_consumergroup_lag{group=“%s”,topic=“%s”}) by (consumergroup,topic,job,env,group,app,instance) > 10 and sum by (consumergroup,topic,job,env,group,app,instance) (increase(kafka_consumergroup_current_offset{group=“%s”,topic=“%s”}[5m])) > 10’, consumergroup, topic, consumergroup, topic)
    query := fmt.Sprintf(‘sum(kafka_consumergroup_lag{group=“dbroute”,consumergroup=“%s”,topic=“%s”}) by (consumergroup,topic,job,env,group,app,instance)’, consumergroup, topic)

    // 执行查询
    result, warnings, err := v1api.Query(ctx, query, model.Now().Time())
    if err != nil {
       klog.Error(err, “unable to query Prometheus”)
       return 0
    }
    if len(warnings) > 0 {
       klog.Warningf(“Prometheus query returned warnings: %v\n”, warnings)
    }

    // 处理查询结果
    // 假设查询返回的结果是一个 scalar 类型
    value := result.(model.Vector)[0].Value
    intValue := int32(value)

    return intValue
}

type ScalingParams struct {
    CurrentReplicas int32
    CurrentMetric   int32
    DesiredMetric   int32
    MinReplicas     int32
    MaxReplicas     int32
}

func (r *HexServerAutoscalerReconciler) scaleResource(namespace string, spec autoscalingv1alpha1.HexServerAutoscalerSpec, currentMetric, desiredMetric int32) (scaled bool, err error) {
    ctx := context.Background()

    switch spec.ScaleTargetRef.Kind {
    case “Deployment”:
       // 获取目标 Deployment 对象
       var deployment appsv1.Deployment
       err := r.Client.Get(ctx, client.ObjectKey{
          Namespace: namespace,
          Name:      spec.ScaleTargetRef.Name,
       }, &deployment)
       if err != nil {
          return false, err
       }

       currentReplicas := *deployment.Spec.Replicas
       scaleParams := ScalingParams{
          CurrentReplicas: currentReplicas,
          CurrentMetric:   currentMetric,
          DesiredMetric:   desiredMetric,
          MinReplicas:     spec.MinReplicas,
          MaxReplicas:     spec.MaxReplicas,
       }
       desiredReplicas := calculateDesiredReplicas(scaleParams)
       if desiredReplicas == currentReplicas {
          return false, nil
       }

       // 更新 Deployment 的副本数量
       deployment.Spec.Replicas = &desiredReplicas
       err = r.Client.Update(ctx, &deployment)
       return true, err

    // 可以在这里添加对其他资源类型(如 StatefulSet)的支持
    default:
       return false, fmt.Errorf(“unsupported target kind: %s”, spec.ScaleTargetRef.Kind)
    }
}

const (
    scaleUpFactor   = 0.8
    scaleDownFactor = 0.5
)

// calculateDesiredReplicas 计算伸缩后的副本数
func calculateDesiredReplicas(params ScalingParams) int32 {
    // 计算未经调整的期望副本数
    rawDesiredReplicas := float64(params.CurrentReplicas * (params.CurrentMetric / params.DesiredMetric))

    // 应用扩缩容因子
    var desiredReplicas float64
    if rawDesiredReplicas > float64(params.CurrentReplicas) {
       // 扩容
       desiredReplicas = float64(params.CurrentReplicas) +
          math.Ceil((rawDesiredReplicas-float64(params.CurrentReplicas))*scaleUpFactor)
    } else {
       // 缩容
       desiredReplicas = float64(params.CurrentReplicas) -
          math.Floor((float64(params.CurrentReplicas)-rawDesiredReplicas)*scaleDownFactor)
    }

    // 确保结果在最小和最大副本数之间
    desiredReplicas = math.Max(float64(params.MinReplicas), desiredReplicas)
    desiredReplicas = math.Min(float64(params.MaxReplicas), desiredReplicas)

    return int32(math.Round(desiredReplicas))
}
...

代码的核心逻辑就写完了。接下来需要对部署配置进行一些调整,准备发布上线。

三、 调整部署配置

a. 修改项目部署的命名空间

编辑 config/default/kustomization.yaml 文件,修改默认的命名空间:

namespace: controller-manager-system

b. 修改资源名称前缀

自动生成的资源名称前缀可能过长,加上资源本身的名字可能会超过 Kubernetes 资源名称 63 个字符的限制。编辑 config/default/kustomization.yaml

namePrefix: server-autoscalers-

c. 修改 deployment 的补丁文件

需要修改镜像地址并添加拉取密钥,同时保留 kube-rbac-proxy 容器。编辑 config/default/manager_auth_proxy_patch.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: controller-manager
  namespace: controller-manager-system
spec:
  template:
    spec:
      containers:
      - name: kube-rbac-proxy
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - “ALL”
        image: swr.cn-north-4.myhuaweicloud.com/hex-dev/kubebuilder-kube-rbac-proxy:v0.16.0
        args:
        - “--secure-listen-address=0.0.0.0:8443”
        - “--upstream=http://127.0.0.1:8080/”
        - “--logtostderr=true”
        - “--v=0”
        ports:
        - containerPort: 8443
          protocol: TCP
          name: https
        resources:
          limits:
            cpu: 500m
            memory: 128Mi
          requests:
            cpu: 5m
            memory: 64Mi
      - name: manager
        args:
          - “--health-probe-bind-address=:8081”
          - “--metrics-bind-address=127.0.0.1:8080”
          - “--leader-elect”
      imagePullSecrets:
        - name: hwy-swr

d. 修改主部署文件中的镜像地址,并添加镜像仓库密钥

编辑 config/manager/manager.yaml

apiVersion: v1
kind: Namespace
metadata:
  labels:
    control-plane: controller-manager
    app.kubernetes.io/name: hex-server-autoscalers-operator
    app.kubernetes.io/managed-by: kustomize
  name: controller-manager-system
---
kind: Secret
apiVersion: v1
metadata:
  name: hwy-swr
  namespace: controller-manager-system
data:
  .dockerconfigjson: 你的config...==
type: kubernetes.io/dockerconfigjson
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: controller-manager
  namespace: controller-manager-system
  labels:
    control-plane: controller-manager
    app.kubernetes.io/name: hex-server-autoscalers-operator
    app.kubernetes.io/managed-by: kustomize
spec:
  selector:
    matchLabels:
      control-plane: controller-manager
  replicas: 1
  template:
    metadata:
      annotations:
        kubectl.kubernetes.io/default-container: manager
      labels:
        control-plane: controller-manager
    spec:
      securityContext:
        runAsNonRoot: true
      containers:
      - command:
        - /manager
        args:
        - --leader-elect
        image: swr.cn-north-4.myhuaweicloud.com/hex-dev/hex-server-autoscalers-operator:1.0.1
        name: manager
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - “ALL”
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8081
          initialDelaySeconds: 15
          periodSeconds: 20
        readinessProbe:
          httpGet:
            path: /readyz
            port: 8081
          initialDelaySeconds: 5
          periodSeconds: 10
        # TODO(user): Configure the resources accordingly based on the project requirements.
        # More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
        resources:
          limits:
            cpu: 500m
            memory: 128Mi
          requests:
            cpu: 10m
            memory: 64Mi
      imagePullSecrets:
        - name: hwy-swr
      serviceAccountName: controller-manager
      terminationGracePeriodSeconds: 10

e. 修改 Makefile

首先修改为私有镜像仓库的地址:

IMG ?= swr.cn-north-4.myhuaweicloud.com/hex-dev/hex-server-autoscalers-operator:1.0.1

如果你的开发机是 macOS M 芯片,构建 Linux 镜像时需要指定平台:

docker-build: ## Build docker image with the manager.
    $(CONTAINER_TOOL) build --platform linux/amd64 -t ${IMG}

然后修改 make deploy 时用于更新镜像的命令:

edit set image swr.cn-north-4.myhuaweicloud.com/hex-dev/hex-server-autoscalers-operator=${IMG}

f. 修改 Dockerfile 中的镜像 CPU 架构

同样,为了在 Mac M 芯片上构建能在 Linux 集群运行的镜像,需要修改 Dockerfile 指定构建平台:

# Build the manager binary
FROM --platform=linux/amd64 golang:1.21 AS builder
ARG TARGETOS
ARG TARGETARCH

WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don‘t need to re-download as much
# and so that source changes don’t invalidate our downloaded layer
RUN go mod download

# Copy the go source
COPY cmd/main.go cmd/main.go
COPY api/ api/
COPY internal/controller/ internal/controller/

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
# was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO
# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore,
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go

# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM --platform=linux/amd64 gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532

ENTRYPOINT [“/manager”]

四、 部署 Operator

配置调整完毕,现在可以开始部署我们编写的 Operator 了。

a. 生成 CRD 和代码

完成 CRD 和控制器的修改后,需要生成 Kubernetes 需要的 CRD 文件和相关代码。执行以下命令:

make generate
make manifests

b. 生成并推送 Docker 镜像

使用 Makefile 构建并推送镜像到你的镜像仓库:

make docker-build docker-push

c. 在集群中安装 CRD

将我们定义的自定义资源(HexServerAutoscaler)安装到 Kubernetes 集群:

make install

d. 预览将要部署的资源文件

在正式部署前,可以先预览一下 Kustomize 生成的最终 YAML 文件:

kubectl kustomize config/default

e. 部署 Operator 控制器

最后,将 Operator 的控制器部署到集群中:

make deploy

至此,Operator 的开发和部署工作全部完成。接下来我们可以创建一个自定义资源实例来测试它的效果。

五、 测试效果

a. 编辑自定义资源示例文件

编辑 config/samples/autoscaling_v1alpha1_hexserverautoscaler.yaml,创建一个具体的扩缩容规则:

apiVersion: autoscaling.hexfuture.net/v1alpha1
kind: HexServerAutoscaler
metadata:
  labels:
    app.kubernetes.io/name: hex-server-autoscalers-operator
    app.kubernetes.io/managed-by: kustomize
  name: hexserverautoscaler-sample
  namespace: b-subs
spec:
  maxReplicas: 10
  minReplicas: 2
  consumergroups:
    - name: group1
      topicsThreshold:
        - topic: test_topic_lag1
          lag: 10
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: accuratescanhte

这个配置意味着:监控名为 group1 的消费者组在 test_topic_lag1 这个 Topic 上的延迟,如果延迟超过 10 条消息,就会对命名空间 b-subs 中名为 accuratescanhte 的 Deployment 进行自动扩缩容,副本数范围在 2 到 10 之间。

b. 部署自定义资源

将上述配置应用到 Kubernetes 集群:

kubectl apply -f config/samples/autoscaling_v1alpha1_hexserverautoscaler.yaml

现在,Operator 就会开始工作。它会周期性地查询 Prometheus 中指定消费者组和 Topic 的延迟指标,并根据我们定义的阈值和算法,自动调整目标 Deployment 的副本数量,从而实现基于业务消息堆积情况的精细化弹性伸缩。

总结

通过以上五个步骤,我们完成了一个功能完整的自定义 Kubernetes Operator 的开发、配置和部署。这个 Operator 扩展了 Kubernetes 的原生能力,使其能够理解我们自定义的 HexServerAutoscaler 资源,并根据 Kafka 的实时业务指标做出扩缩容决策。

这种模式非常灵活,你可以基于这个框架,轻松修改数据源(如换成其他监控系统)、调整扩缩容算法或支持更多类型的资源(如 StatefulSet),以满足各种复杂的自动化运维场景。在 云原生/IaaS 领域,掌握 Operator 开发能力,意味着你能为 Kubernetes 生态量身打造专属的自动化管理工具,极大地提升运维效率和系统弹性。





上一篇:Redis Cluster核心机制与实践:从数据分片、故障转移到Smart Client
下一篇:Nginx命名Location实战:一个URL跨多级目录查找文件实现存储无缝扩容
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-11 20:50 , Processed in 0.584928 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表