找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3542

积分

0

好友

468

主题
发表于 1 小时前 | 查看: 5| 回复: 0

不只讲“Skill 是什么”,而是完整回答四个工程问题:Skill 如何被召回、如何被编排、如何在高并发下稳定执行、如何在企业环境中安全演进。

一、先把问题说透:为什么企业一定会走到 Skill 架构

很多团队第一次接触 AI Agent,做法都很相似:

  1. 给大模型一段很长的 System Prompt
  2. 挂上几十个工具
  3. 让模型自己决定什么时候调用什么工具

Demo 阶段这套方案往往很顺,但一到生产环境就开始失控:

  • 工具越来越多,模型选错工具的概率上升
  • 指令越来越长,Token 成本和延迟快速恶化
  • 同一个问题不同人问,执行链路不稳定
  • 安全边界模糊,敏感命令和高危接口无法精细管控
  • 故障场景下没有审计、没有回放、没有幂等

本质上,问题不在模型“变笨了”,而在于系统把所有知识、工具、流程、约束都塞进了一个巨型上下文里。

这时候就需要引入一个中间层:

Skill = 领域知识 + 工具集合 + 参数约束 + 执行策略 + 输出契约

Skill 不是简单的提示词模板,也不是单纯的函数调用描述。
它更像是一个面向 Agent 运行时的“领域能力包”。

对于企业来说,Skill 架构解决的是四个核心矛盾:

矛盾 传统 Prompt/Tool 方案 Skill 化之后
知识与执行耦合 所有知识堆在一个 Prompt 中 按领域拆分,按需注入
工具增长失控 工具越多越难选 先选 Skill,再在 Skill 内选工具
安全边界不清 所有工具平铺给模型 按 Skill 建立最小权限
工程治理困难 无版本、无审计、无回滚 有注册中心、版本、灰度、观测

一句话总结:

Skill 是企业把“人类专家的工作方式”结构化、产品化、可编排化的过程。

二、Skill 的底层原理:它不是 Prompt 技巧,而是受限推理空间

2.1 从 ReAct 到受限推理

大多数 Agent 的底层都是 ReAct:

Observe -> Think -> Act -> Observe -> Think -> Act

问题在于,原始 ReAct 把所有推理都放在同一个上下文窗口里。
当工具数从 5 个增加到 50 个、调用链从 2 步增加到 10 步后,会出现三个问题:

  • 模型需要在过大的动作空间中决策
  • 每一轮都要重复消费大量上下文
  • 越往后推理,越容易偏离原始约束

Skill 的真正价值,是把“大而全的推理空间”切成多个“小而准的推理空间”。

用户请求
  -> Skill Recall
  -> 命中若干候选 Skill
  -> 在候选 Skill 的受限上下文中规划
  -> 生成结构化执行计划
  -> 执行器落地执行

所以 Skill 的架构意义,不是“给模型更多提示”,而是:

  1. 缩小搜索空间
  2. 提升规划稳定性
  3. 降低上下文噪声
  4. 让运行时具备治理抓手

2.2 Skill 在系统中的角色

一个成熟的 Agent 平台,至少存在五层:

┌─────────────────────────────────────────────┐
│              Interaction Layer              │
│ Chat / API / Workflow / SSE / Webhook       │
├─────────────────────────────────────────────┤
│              Orchestration Layer            │
│ Recall / Planner / Scheduler / Memory       │
├─────────────────────────────────────────────┤
│                Skill Layer                  │
│ Domain Context / Tool Set / Constraints     │
├─────────────────────────────────────────────┤
│              Execution Layer                │
│ HTTP / gRPC / Script / Wasm / Browser       │
├─────────────────────────────────────────────┤
│              Governance Layer               │
│ Auth / Audit / Quota / Trace / Policy       │
└─────────────────────────────────────────────┘

Skill 位于编排层和执行层之间,它承担两个职责:

  • 对上,为 Planner 提供领域能力边界
  • 对下,为 Executor 提供可执行契约

2.3 Skill、Plugin、MCP、Workflow 的边界

很多团队一开始会把这些概念混用,后面就会越做越乱。

能力形态 本质 适合场景
Skill 领域能力包 K8s 排障、订单风控、客服赔付
Plugin 平台钩子扩展 审计、日志注入、Prompt 变换
MCP Server 工具提供者 浏览器、数据库、IDE、文档系统
Workflow 固定流程编排 审批、批处理、日报生成

简单判断规则:

  • 需要“领域知识 + 工具约束 + 输出规范”,选 Skill
  • 需要“平台级横切逻辑”,选 Plugin
  • 需要“连接外部系统”,选 MCP
  • 需要“稳定且固定的流程链路”,选 Workflow

三、企业级 Skill 架构演进:从本地目录到分布式注册中心

3.1 三个阶段的演进路径

阶段一:单项目内嵌 Skill

.opencode/skills/*/SKILL.md

适用于:

  • 个人项目
  • 10 个以内 Skill
  • 单团队维护

优点是简单,缺点是没有统一治理能力。

阶段二:团队级集中管理

Git Repo / Internal Package / HTTP Pull

适用于:

  • 多项目共享 Skill
  • 需要版本管理
  • 开始有审批和回滚需求

这个阶段的核心变化是:Skill 从“文件”升级为“可发布资产”。

阶段三:企业级 Skill Registry

                  ┌────────────────────┐
                  │   Skill Registry   │
                  │ Metadata + Content │
                  └─────────┬──────────┘
                            │
          ┌─────────────────┼─────────────────┐
          ▼                 ▼                 ▼
   ┌────────────┐    ┌────────────┐    ┌────────────┐
   │ Team A     │    │ Team B     │    │ Team C     │
   │ Publisher  │    │ Publisher  │    │ Publisher  │
   └────────────┘    └────────────┘    └────────────┘
                            │
                            ▼
                  ┌────────────────────┐
                  │ Agent Orchestrator │
                  └────────────────────┘

适用于:

  • 50+ Skill
  • 多团队协作
  • 多租户、多环境、多集群
  • 对审计、权限、灰度、可观测有明确要求

3.2 为什么必须有 Registry

当 Skill 数量一上来,没有 Registry 很快会出现四类问题:

  • 不知道谁在维护这个 Skill
  • 不知道哪个版本在生产上生效
  • 不知道召回率为什么下降
  • 不知道某次故障是不是由 Skill 变更引起

Registry 解决的是“资产治理”问题,而不是“文件存储”问题。

一个生产级 Skill Registry 至少要提供:

  • 元数据管理:名称、版本、标签、所属团队、兼容环境
  • 内容管理:Skill 主体、脚本、模板、依赖声明
  • 检索接口:关键词检索、语义检索、标签过滤
  • 发布治理:审批、灰度、回滚、弃用
  • 事件通知:变更推送、缓存失效、审计记录

四、统一案例:构建企业级 Kubernetes 排障 Skill

为了避免文章只停留在抽象层,下面统一用一个真实又典型的场景贯穿全文:

团队要建设一个“专属 AI 架构师”,帮助值班工程师排查 K8s 集群中的 Pod CrashLoopBackOff、OOMKilled、节点压力、网络异常和服务抖动问题。

这个场景非常适合 Skill 化,原因有三点:

  1. 排障流程高度专业,依赖经验
  2. 工具链丰富但风险高,涉及 kubectl、日志、指标、变更操作
  3. 结果需要结构化沉淀,便于复盘和审计

4.1 Skill 目录结构

.opencode/skills/k8s-troubleshoot/
├── SKILL.md
├── scripts/
│   ├── diagnose_pod.sh
│   ├── inspect_node.sh
│   └── query_prometheus.py
├── templates/
│   ├── incident_summary.md
│   └── action_plan.md
└── policy/
    └── permissions.json

4.2 生产级 SKILL.md

---
name: k8s-troubleshoot
description: >
  Use when investigating Kubernetes cluster incidents including
  CrashLoopBackOff, OOMKilled, Pending, Evicted, ImagePullBackOff,
  service latency, node pressure, network policy, DNS resolution,
  ingress errors, and Prometheus metric anomalies.
metadata:
  domain: infrastructure
  owner: sre-platform
  compatibility: "k8s>=1.27"
  risk_level: medium
  environments: ["dev", "test", "prod"]
  approval: "required-on-prod-mutation"
---

# K8s 生产排障 Skill

## Objective

快速完成以下工作:

1. 归类故障范围:pod / node / network / storage / config
2. 采集证据:events / describe / logs / metrics
3. 输出根因假设与置信度
4. 生成可执行修复建议
5. 在生产环境下禁止直接执行高危命令

## Readonly Tools

- `bash scripts/diagnose_pod.sh <namespace> <pod>`
- `bash scripts/inspect_node.sh <node>`
- `python3 scripts/query_prometheus.py --query "<promql>"`

## Mandatory Workflow

1. 先看 recent events,再看 describe,再看 logs,最后看 metrics
2. 如果发现 `OOMKilled`,必须同时检查 limit/request 与节点内存压力
3. 如果发现 `CrashLoopBackOff`,必须回看 previous logs
4. 如果问题发生在 prod,所有变更类操作只给建议,不自动执行

## Output Contract

最终输出必须包含以下字段:

```json
{
  "classification": "pod|node|network|storage|config",
  "severity": "P0|P1|P2|P3",
  "summary": "一句话总结",
  "evidence": [
    {"type": "event", "content": "..."},
    {"type": "log", "content": "..."},
    {"type": "metric", "content": "..."}
  ],
  "root_cause": {
    "hypothesis": "最可能根因",
    "confidence": 0.0
  },
  "action_plan": [
    {
      "step": 1,
      "command": "建议命令",
      "risk": "low|medium|high",
      "verification": "验证方式"
    }
  ]
}

这段 SKILL.md 有几个生产化要点:

- `description` 同时覆盖了语义描述和高频关键词
- `metadata` 显式声明 owner、环境、风险等级
- `Mandatory Workflow` 固化了人类专家的排障顺序
- `Output Contract` 强制结构化,避免模型自由发挥

## 五、运行时架构:一次请求到底是怎么流转的

### 5.1 请求链路全景

User Request


API Gateway


Conversation Service

├── Load Session
├── Recall Candidate Skills
├── Build Planning Context

Planner / LLM Gateway

├── Return Plan
├── Select Skills
└── Select Tool Calls

Execution Orchestrator

├── Policy Check
├── Rate Limit
├── Parallel Execution
├── Retry / Timeout / Circuit Breaker

Skill Executor Pool

├── Bash Runner
├── HTTP Runner
├── gRPC Runner
└── Wasm Runner

Observation + Audit + Event Store


Response / SSE Stream


### 5.2 四个关键子系统

**1. Recall Engine**

负责从大量 Skill 中找出最可能命中的候选集合。
生产环境一般采用“双路召回”:

- 关键词召回:高精度、低成本
- 向量召回:解决自然语言表达不一致问题

实际工程中,不建议直接把所有 Skill 内容都做 embedding。
更常见的做法是:

- `name + description + tags + metadata` 做检索向量
- `body` 只在命中后懒加载

**2. Planner**

负责把用户问题转成执行计划,而不是直接发命令。

一个成熟的 Planner 输出应该至少包含:

- 目标任务
- 使用哪些 Skill
- 调用顺序
- 哪些步骤可并行
- 哪些步骤需要人工确认

**3. Orchestrator**

这是工程成败的关键。
真正的“企业级”不在 Prompt,而在编排器。

Orchestrator 至少要负责:

- 执行顺序控制
- 任务并发调度
- 限流与隔离
- 超时、重试、熔断
- 幂等与去重
- 中间结果聚合
- SSE 进度推送

**4. Executor**

Executor 不应该只有一个“运行命令”的粗暴实现,而应该有多种 Runner:

- HTTP Runner:调用内部微服务
- gRPC Runner:低延迟内部 RPC
- Script Runner:执行现有脚本
- Wasm Runner:轻量沙箱
- Browser Runner:网页自动化

不同 Runner 的安全模型和资源模型完全不同,必须分开治理。

## 六、核心数据模型:生产系统先定义边界,再写代码

下面给出一个可直接落地的 Skill 定义模型。

### 6.1 SkillDefinition

```go
package skill

type Definition struct {
    Name         string            `json:"name"`
    Version      string            `json:"version"`
    Description  string            `json:"description"`
    Tags         []string          `json:"tags"`
    Metadata     map[string]string `json:"metadata"`
    Parameters   JSONSchema        `json:"parameters"`
    Executor     ExecutorSpec      `json:"executor"`
    Policy       PolicySpec        `json:"policy"`
    OutputSchema JSONSchema        `json:"output_schema"`
}

type ExecutorSpec struct {
    Type           string            `json:"type"`
    Endpoint       string            `json:"endpoint"`
    TimeoutMs      int               `json:"timeout_ms"`
    MaxConcurrency int               `json:"max_concurrency"`
    Retry          RetrySpec         `json:"retry"`
    Headers        map[string]string `json:"headers"`
}

type RetrySpec struct {
    MaxAttempts int `json:"max_attempts"`
    BaseDelayMs int `json:"base_delay_ms"`
}

type PolicySpec struct {
    RiskLevel       string   `json:"risk_level"`
    RequiredRoles   []string `json:"required_roles"`
    AllowedEnv      []string `json:"allowed_env"`
    NeedApproval    bool     `json:"need_approval"`
    Idempotent      bool     `json:"idempotent"`
    AllowParallel   bool     `json:"allow_parallel"`
    Mutable         bool     `json:"mutable"`
}

这个模型最重要的不是字段多,而是它明确区分了三件事:

  • 业务语义:descriptiontags
  • 执行契约:parametersexecutor
  • 治理约束:policy

6.2 ExecutionPlan

package orchestration

type Plan struct {
    RequestID string     `json:"request_id"`
    Steps     []PlanStep `json:"steps"`
}

type PlanStep struct {
    StepID        string         `json:"step_id"`
    SkillName     string         `json:"skill_name"`
    Arguments     map[string]any `json:"arguments"`
    DependsOn     []string       `json:"depends_on"`
    ParallelGroup string         `json:"parallel_group"`
    NeedApproval  bool           `json:"need_approval"`
    TimeoutMs     int            `json:"timeout_ms"`
}

这一步很关键。
如果你的系统没有显式 ExecutionPlan,大概率后面很难做:

  • 可回放
  • 可重试
  • 失败补偿
  • 并行调度
  • 审计

七、生产级代码实战:Skill Registry、编排器、执行器

下面的代码不追求 Demo 式“最短”,而是尽量接近生产设计。

7.1 Skill Registry:版本化发布与读取

package registry

import (
    "context"
    "fmt"
    "sync"

    "github.com/hashicorp/go-version"
)

type Store interface {
    Save(ctx context.Context, def Definition) error
    Get(ctx context.Context, name, ver string) (Definition, error)
    List(ctx context.Context, filter Filter) ([]Definition, error)
}

type Service struct {
    store Store
    cache sync.Map
}

type Filter struct {
    Query string
    Tags  []string
    Env   string
}

func (s *Service) Publish(ctx context.Context, def Definition) error {
    if def.Name == "" || def.Version == "" {
        return fmt.Errorf("name/version required")
    }
    if _, err := version.NewVersion(def.Version); err != nil {
        return fmt.Errorf("invalid semver: %w", err)
    }
    if def.Executor.TimeoutMs <= 0 {
        def.Executor.TimeoutMs = 3000
    }
    return s.store.Save(ctx, def)
}

func (s *Service) Search(ctx context.Context, filter Filter) ([]Definition, error) {
    return s.store.List(ctx, filter)
}

生产建议:

  • Publish 不只是保存,还要触发 lint、policy 校验、兼容性检查
  • Registry 中应保留 immutable version,不直接覆盖
  • “当前生效版本”建议通过 alias 或 release channel 管理

7.2 Recall:关键词 + 语义双路召回

package recall

import "sort"

type Candidate struct {
    Skill Definition
    Score float64
}

type Engine struct {
    keyword KeywordRetriever
    vector  VectorRetriever
}

func (e *Engine) Recall(query string, topN int) []Candidate {
    kw := e.keyword.Search(query, topN*2)
    vec := e.vector.Search(query, topN*2)

    merged := make(map[string]Candidate)
    merge := func(items []Candidate, weight float64) {
        for _, item := range items {
            current := merged[item.Skill.Name]
            current.Skill = item.Skill
            current.Score += item.Score * weight
            merged[item.Skill.Name] = current
        }
    }

    merge(kw, 0.45)
    merge(vec, 0.55)

    result := make([]Candidate, 0, len(merged))
    for _, item := range merged {
        result = append(result, item)
    }

    sort.Slice(result, func(i, j int) bool {
        return result[i].Score > result[j].Score
    })

    if len(result) > topN {
        result = result[:topN]
    }
    return result
}

这里的经验点有两个:

  • 关键词召回用于“强约束词”,比如 OOMKilledCrashLoopBackOff
  • 向量召回用于“自然语言变体”,比如“服务老是抖”“节点资源吃满了”

7.3 Orchestrator:并发编排与失败控制

package orchestration

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/errgroup"
    "golang.org/x/sync/semaphore"
)

type Executor interface {
    Execute(ctx context.Context, step PlanStep) (StepResult, error)
}

type EventSink interface {
    Publish(ctx context.Context, event Event)
}

type Orchestrator struct {
    executor         Executor
    events           EventSink
    globalSemaphore  *semaphore.Weighted
}

type StepResult struct {
    StepID   string         `json:"step_id"`
    Success  bool           `json:"success"`
    Output   map[string]any `json:"output"`
    ErrorMsg string         `json:"error_msg,omitempty"`
}

func NewOrchestrator(executor Executor, sink EventSink, maxConcurrent int64) *Orchestrator {
    return &Orchestrator{
        executor:        executor,
        events:          sink,
        globalSemaphore: semaphore.NewWeighted(maxConcurrent),
    }
}

func (o *Orchestrator) Run(ctx context.Context, plan Plan) ([]StepResult, error) {
    results := make([]StepResult, 0, len(plan.Steps))
    resultsMu := sync.Mutex{}

    groups := groupByParallel(plan.Steps)
    for _, steps := range groups {
        eg, groupCtx := errgroup.WithContext(ctx)
        for _, step := range steps {
            step := step
            eg.Go(func() error {
                if err := o.globalSemaphore.Acquire(groupCtx, 1); err != nil {
                    return err
                }
                defer o.globalSemaphore.Release(1)

                timeout := time.Duration(step.TimeoutMs) * time.Millisecond
                if timeout == 0 {
                    timeout = 5 * time.Second
                }
                stepCtx, cancel := context.WithTimeout(groupCtx, timeout)
                defer cancel()

                o.events.Publish(stepCtx, Event{
                    Type:   "step_started",
                    StepID: step.StepID,
                })

                output, err := o.executor.Execute(stepCtx, step)
                if err != nil {
                    res := StepResult{
                        StepID:   step.StepID,
                        Success:  false,
                        ErrorMsg: err.Error(),
                    }
                    resultsMu.Lock()
                    results = append(results, res)
                    resultsMu.Unlock()

                    o.events.Publish(stepCtx, Event{
                        Type:   "step_failed",
                        StepID: step.StepID,
                        Error:  err.Error(),
                    })
                    return fmt.Errorf("step %s failed: %w", step.StepID, err)
                }

                output.Success = true
                resultsMu.Lock()
                results = append(results, output)
                resultsMu.Unlock()

                o.events.Publish(stepCtx, Event{
                    Type:   "step_finished",
                    StepID: step.StepID,
                })
                return nil
            })
        }
        if err := eg.Wait(); err != nil {
            return results, err
        }
    }
    return results, nil
}

这段代码体现了几个生产特征:

  • semaphore 做全局并发闸门,避免 Executor 被打爆
  • 每一步独立超时,避免单点步骤拖垮整个链路
  • 通过 EventSink 把进度推到 SSE、Kafka、审计系统
  • 并行组执行完后再进入下一组,便于做依赖控制

7.4 Executor:参数校验、幂等、策略控制

package executor

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

type PolicyChecker interface {
    Check(ctx context.Context, step PlanStep) error
}

type IdempotencyStore interface {
    Get(ctx context.Context, key string) ([]byte, bool, error)
    Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}

type Service struct {
    policy PolicyChecker
    store  IdempotencyStore
    client *http.Client
}

func (s *Service) Execute(ctx context.Context, step PlanStep) (StepResult, error) {
    if err := s.policy.Check(ctx, step); err != nil {
        return StepResult{}, err
    }

    cacheKey := buildIdempotencyKey(step)
    if raw, ok, _ := s.store.Get(ctx, cacheKey); ok {
        var cached StepResult
        if err := json.Unmarshal(raw, &cached); err == nil {
            return cached, nil
        }
    }

    body, _ := json.Marshal(step.Arguments)
    req, err := http.NewRequestWithContext(ctx, http.MethodPost, resolveEndpoint(step.SkillName), bytes.NewReader(body))
    if err != nil {
        return StepResult{}, err
    }
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Request-ID", step.StepID)

    resp, err := s.client.Do(req)
    if err != nil {
        return StepResult{}, err
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 500 {
        return StepResult{}, fmt.Errorf("executor upstream 5xx: %d", resp.StatusCode)
    }
    if resp.StatusCode >= 400 {
        return StepResult{}, fmt.Errorf("executor upstream 4xx: %d", resp.StatusCode)
    }

    var result StepResult
    if err := json.NewDecoder(resp.Body).Decode(&result.Output); err != nil {
        return StepResult{}, err
    }
    result.StepID = step.StepID

    raw, _ := json.Marshal(result)
    _ = s.store.Set(ctx, cacheKey, raw, 10*time.Minute)
    return result, nil
}

这一步常被低估,但它决定系统是否能扛线上重试风暴:

  • 用户重复提交
  • 网关重试
  • LLM 重新规划重复步骤
  • 下游超时但实际上已成功

没有幂等,就没有生产级 Agent。

八、高并发与可扩展:企业级系统真正拉开差距的地方

很多文章讲 Skill,只讲“怎么定义”,不讲“怎么扛流量”。
但企业系统一旦接入客服、运维、风控、运营平台,流量模型会比想象中复杂得多。

8.1 高并发下的四类瓶颈

1. LLM 瓶颈

  • 外部 API 延迟高
  • token 生成速度不稳定
  • 厂商限流
  • 模型实例负载抖动

2. Skill Executor 瓶颈

  • 脚本执行耗 CPU
  • kubectl、数据库、搜索接口等下游资源有限
  • 高危操作需要审批,阻塞链路

3. Session 瓶颈

  • 长会话上下文快速膨胀
  • Redis 热 key
  • 大对象序列化压力

4. 流式连接瓶颈

  • SSE 长连接占资源
  • 前端断连重试放大流量
  • 中间层代理的连接超时配置不当

8.2 高并发设计原则

原则一:先把规划和执行解耦

不要把“模型规划”与“技能执行”写成同步串行黑盒。
应拆成两个阶段:

Planning Phase -> Execution Phase

好处:

  • 可以缓存 Planning 结果
  • 可以单独优化 LLM 调用
  • 可以把执行部分异步化
  • 可以对计划做审批、风控、重放

原则二:区分同步 Skill 和异步 Skill

类型 示例 处理方式
同步 Skill 查订单、查 Pod 状态 直接返回
准同步 Skill 触发扩容、创建工单 接受后异步跟踪
异步 Skill 批量分析日志、生成日报 投递 MQ,事件回传

如果系统把所有 Skill 都放在同步链路里,迟早被长尾任务拖死。

原则三:按 Skill 维度做隔离

不能只做“全局限流”,还要做“单 Skill 隔离”。

比如:

  • k8s-troubleshoot 最大并发 100
  • billing-refund 最大并发 20
  • browser-automation 最大并发 10

原因很简单:不同 Skill 的资源模型完全不同。

8.3 单 Skill 并发控制

package control

import (
    "context"
    "sync"

    "golang.org/x/sync/semaphore"
)

type SkillLimiter struct {
    mu       sync.RWMutex
    limiters map[string]*semaphore.Weighted
}

func NewSkillLimiter() *SkillLimiter {
    return &SkillLimiter{
        limiters: make(map[string]*semaphore.Weighted),
    }
}

func (s *SkillLimiter) Register(skill string, max int64) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.limiters[skill] = semaphore.NewWeighted(max)
}

func (s *SkillLimiter) Acquire(ctx context.Context, skill string) error {
    s.mu.RLock()
    sem := s.limiters[skill]
    s.mu.RUnlock()
    if sem == nil {
        return nil
    }
    return sem.Acquire(ctx, 1)
}

func (s *SkillLimiter) Release(skill string) {
    s.mu.RLock()
    sem := s.limiters[skill]
    s.mu.RUnlock()
    if sem != nil {
        sem.Release(1)
    }
}

这是企业级常见做法:全局限流 + 租户限流 + Skill 限流三层叠加。

8.4 背压与降级

当系统流量超过承载能力时,不能只会报错,应该有明确降级策略:

  1. 降低召回数量:Top 8 降到 Top 3
  2. 关闭昂贵模型:从高阶模型降到轻量模型
  3. 禁止非必要并行:串行执行可选步骤
  4. 长任务转异步:返回 task_id,前端轮询或订阅
  5. 命中缓存直接返回:尤其是查询类 Skill

8.5 缓存策略

企业级 Skill 平台至少要有四层缓存思维:

层级 缓存内容 价值
Recall Cache query -> candidates 减少检索开销
Plan Cache normalized request -> execution plan 减少 LLM 调用
Result Cache idempotent skill output 降低下游压力
Metadata Cache skill definitions 降低 Registry 压力

但要注意:

  • 查询型 Skill 可以强缓存
  • 变更型 Skill 只能做幂等缓存,不能做结果复用

九、安全治理:没有权限边界的 Agent 一定不能上生产

企业最担心的从来不是“模型答错”,而是“模型做错”。

Skill 平台的安全治理至少要覆盖五层。

9.1 参数层:JSON Schema 校验

所有入参必须经过 schema 校验,不能让模型自由拼装。

例如:

{
  "type": "object",
  "properties": {
    "namespace": {
      "type": "string",
      "pattern": "^[a-z0-9-]+$"
    },
    "pod": {
      "type": "string",
      "maxLength": 128
    }
  },
  "required": ["namespace", "pod"],
  "additionalProperties": false
}

9.2 策略层:Allow / Ask / Deny

{
  "readonly": [
    "kubectl get *",
    "kubectl describe *",
    "kubectl logs *"
  ],
  "ask": [
    "kubectl exec *",
    "kubectl rollout restart *"
  ],
  "deny": [
    "kubectl delete ns *",
    "kubectl delete node *",
    "kubectl * --all"
  ]
}

9.3 身份层:RBAC + 环境隔离

Skill 权限不应该只取决于“模型想不想执行”,而是取决于:

  • 当前用户身份
  • 当前租户
  • 当前环境
  • 当前 Skill 风险等级

最常见的规则是:

  • dev 可自动执行低风险变更
  • test 允许部分中风险变更
  • prod 默认只读,变更必须审批

9.4 执行层:沙箱与网络隔离

对于脚本型 Skill,至少做到:

  • 独立容器或 Wasm 沙箱
  • 只读文件系统
  • CPU/内存配额
  • 出网白名单
  • 临时目录生命周期隔离

9.5 审计层:谁触发、触发了什么、结果是什么

审计记录至少包含:

  • 用户 ID
  • 会话 ID
  • 请求原文
  • 命中的 Skill
  • 每一步参数
  • 每一步执行结果
  • 是否人工确认
  • 最终输出

这不仅是为了安全,也是为了事后复盘和模型优化。

十、可观测性:不观测就无法优化,不可回放就无法复盘

10.1 必做指标

Skill 平台建议至少暴露以下指标:

skill_recall_total{skill, result}
skill_recall_latency_ms{skill}
planner_latency_ms{model}
skill_execution_total{skill, status}
skill_execution_latency_ms{skill}
skill_execution_timeout_total{skill}
skill_policy_denied_total{skill}
skill_approval_wait_ms{skill}
session_context_tokens{tenant}

10.2 Trace 设计

一条完整链路至少有这些 Span:

request
  ├── load_session
  ├── recall_skills
  ├── planner_llm
  ├── execute_step:diagnose-pod
  ├── execute_step:query-prometheus
  ├── summarize_llm
  └── stream_response

10.3 结构化事件

type Event struct {
    RequestID string         `json:"request_id"`
    SessionID string         `json:"session_id"`
    Type      string         `json:"type"`
    StepID    string         `json:"step_id,omitempty"`
    SkillName string         `json:"skill_name,omitempty"`
    Payload   map[string]any `json:"payload,omitempty"`
    Error     string         `json:"error,omitempty"`
    At        time.Time      `json:"at"`
}

这类事件流很有价值,可以同时服务于:

  • 前端实时展示
  • Kafka 异步消费
  • 审计落库
  • 离线分析召回率和失败率

十一、真实场景复盘:一次 CrashLoopBackOff 故障是如何被 Skill 体系处理的

假设用户输入:

帮我排查 prod 命名空间 payment-service-7d8c9d4f6f-2lqpx 为什么一直 CrashLoopBackOff

11.1 Recall 阶段

系统可能召回以下候选 Skill:

Skill 分数 命中原因
k8s-troubleshoot 0.96 包含 CrashLoopBackOff、prod、pod 排障语义
service-latency-analysis 0.42 与 service 关键词相关
release-rollback 0.18 与生产服务有关,但不够精准

最终主命中 k8s-troubleshoot

11.2 Planner 输出计划

{
  "request_id": "req-20260525-1001",
  "steps": [
    {
      "step_id": "s1",
      "skill_name": "k8s-get-events",
      "arguments": {"namespace": "prod"},
      "parallel_group": "g1"
    },
    {
      "step_id": "s2",
      "skill_name": "k8s-describe-pod",
      "arguments": {
        "namespace": "prod",
        "pod": "payment-service-7d8c9d4f6f-2lqpx"
      },
      "parallel_group": "g1"
    },
    {
      "step_id": "s3",
      "skill_name": "k8s-get-previous-logs",
      "arguments": {
        "namespace": "prod",
        "pod": "payment-service-7d8c9d4f6f-2lqpx"
      },
      "depends_on": ["s2"]
    },
    {
      "step_id": "s4",
      "skill_name": "promql-query",
      "arguments": {
        "query": "sum(container_memory_working_set_bytes{pod='payment-service-7d8c9d4f6f-2lqpx'})"
      },
      "parallel_group": "g2"
    }
  ]
}

这份计划天然具备并行性:

  • s1s2 可并发
  • s3 依赖 s2
  • s4 可和 s3 并行

11.3 Executor 采集到的关键信息

describe:
  Last State: Terminated
  Reason: OOMKilled

previous logs:
  java.lang.OutOfMemoryError: Java heap space

metrics:
  container_memory_working_set_bytes = 1.73Gi
  pod memory limit = 1536Mi

11.4 Skill 最终结构化输出

{
  "classification": "pod",
  "severity": "P1",
  "summary": "payment-service Pod 因内存超限被 kubelet 杀死,导致持续 CrashLoopBackOff",
  "evidence": [
    {
      "type": "event",
      "content": "Back-off restarting failed container"
    },
    {
      "type": "log",
      "content": "java.lang.OutOfMemoryError: Java heap space"
    },
    {
      "type": "metric",
      "content": "working_set_bytes(1.73Gi) > limit(1.5Gi)"
    }
  ],
  "root_cause": {
    "hypothesis": "JVM 堆配置与容器 limit 不匹配,触发 OOMKilled",
    "confidence": 0.94
  },
  "action_plan": [
    {
      "step": 1,
      "command": "kubectl -n prod set resources deploy/payment-service --limits=memory=2Gi --requests=memory=1Gi",
      "risk": "medium",
      "verification": "kubectl -n prod rollout status deploy/payment-service"
    },
    {
      "step": 2,
      "command": "调整 JVM -Xmx,建议不超过容器 limit 的 60%-70%",
      "risk": "low",
      "verification": "观察 Pod 重启次数和 GC 指标"
    }
  ]
}

这就是 Skill 架构相较于简单问答的价值:

  • 不只给结论
  • 还给证据
  • 还给行动方案
  • 还能控制风险和审批边界

十二、从单体 Skill 到企业级 Skill 平台:完整部署拓扑

12.1 推荐服务拆分

agent-gateway
conversation-service
planner-service
skill-registry
skill-executor-http
skill-executor-script
skill-executor-browser
policy-service
audit-service
event-bus

为什么建议把 Executor 再拆细?

  • HTTP/gRPC 型 Skill 主要吃网络与连接池
  • Script/Wasm 型 Skill 主要吃 CPU 与隔离资源
  • Browser 型 Skill 主要吃内存与实例数

混在一起,调度和容量都很难做。

12.2 Kubernetes 部署建议

apiVersion: apps/v1
kind: Deployment
metadata:
  name: skill-executor-script
spec:
  replicas: 4
  selector:
    matchLabels:
      app: skill-executor-script
  template:
    metadata:
      labels:
        app: skill-executor-script
    spec:
      containers:
        - name: app
          image: registry.example.com/skill-executor-script:1.0.0
          resources:
            requests:
              cpu: "500m"
              memory: "512Mi"
            limits:
              cpu: "2"
              memory: "2Gi"
          env:
            - name: EXECUTOR_MAX_CONCURRENCY
              value: "50"
            - name: POLICY_ENDPOINT
              value: "http://policy-service.default.svc.cluster.local"
          readinessProbe:
            httpGet:
              path: /readyz
              port: 8080
          livenessProbe:
            httpGet:
              path: /livez
              port: 8080

12.3 HPA 关注指标

Skill 平台不建议只看 CPU 自动扩容,还要看:

  • 正在执行的 step 数
  • 等待队列长度
  • P95 执行时延
  • SSE 活跃连接数
  • 下游错误率

十三、工程治理:版本、灰度、回滚、兼容性

13.1 Skill 版本管理建议

使用语义化版本:

  • MAJOR:输入输出契约变化,不兼容
  • MINOR:新增能力,兼容
  • PATCH:修复文案、策略、脚本细节

13.2 灰度发布

生产上不要直接全量替换 Skill。
推荐做法:

  1. 发布新版本到 Registry
  2. 标记为 candidate
  3. 仅对测试租户或 5% 流量生效
  4. 观测召回率、成功率、耗时、人工接管率
  5. 没问题后切到 stable

13.3 回滚

Skill 回滚要保证两个层面都能回:

  • 元数据版本回滚
  • 脚本/模板/依赖回滚

所以 Skill 发布最好是“不可变包”,而不是简单覆盖文件。

13.4 兼容性约束

一个企业级 Skill 很容易依赖环境差异:

  • K8s 版本不同
  • CLI 参数不同
  • API 返回结构不同
  • 监控系统指标名不同

因此建议在 metadata 中显式标明兼容矩阵,例如:

metadata:
  compatibility:
    kubernetes: ">=1.27,<1.31"
    prometheus: ">=2.45"
    cni: ["calico", "cilium"]

十四、质量保障:测试不是补充项,而是 Skill 能否长期演进的前提

14.1 四层测试模型

1. Lint 测试

检查:

  • frontmatter 是否完整
  • description 是否过短
  • output schema 是否存在
  • policy 是否缺失

2. Contract 测试

验证:

  • 参数 schema 是否正确
  • 输出是否符合约束
  • 危险命令是否被拦截

3. Replay 测试

把历史真实请求回放到新版本 Skill,观察:

  • 召回是否下降
  • 计划是否变差
  • 输出是否更稳定

4. Chaos/Failure 测试

模拟:

  • Registry 不可用
  • LLM 429/5xx
  • Executor 超时
  • Redis 抖动
  • 审批服务不可用

14.2 一个简单的 Skill Lint 示例

package lint

import "fmt"

func Validate(def Definition) error {
    if len(def.Description) < 40 {
        return fmt.Errorf("description too short")
    }
    if def.OutputSchema.Type == "" {
        return fmt.Errorf("output schema required")
    }
    if def.Policy.RiskLevel == "" {
        return fmt.Errorf("policy.risk_level required")
    }
    if def.Executor.TimeoutMs <= 0 || def.Executor.TimeoutMs > 30000 {
        return fmt.Errorf("invalid executor timeout")
    }
    return nil
}

十五、常见反模式:很多团队不是做不出来,而是做偏了

反模式一:把 Skill 写成一篇超长说明书

后果:

  • 召回成本高
  • 注入成本高
  • 约束不聚焦

正确做法:

  • 把长知识拆到模板、脚本、引用文档
  • Skill 主体只保留“何时用、怎么做、输出什么”

反模式二:只做工具包装,不做领域约束

如果 Skill 只是把 kubectl 或内部 API 重新描述一遍,它还不是 Skill,只是工具清单。

真正的 Skill 要体现:

  • 决策顺序
  • 风险边界
  • 输出规范
  • 失败兜底

反模式三:把所有步骤都交给模型临场发挥

生产环境中,越关键的步骤越要结构化。
模型应该负责“规划与归纳”,而不是每一步都自由 improvisation。

反模式四:没有显式审批链

高风险 Skill 如果没有审批机制,最终不是平台下线,就是被安全部门一票否决。

十六、落地路线图:企业该怎么分阶段建设

如果你所在团队现在还没有 Skill 平台,不要一开始就上最复杂的分布式架构。
推荐按下面四步推进。

第一步:先做 3 到 5 个高价值 Skill

优先选择:

  • 频繁重复
  • 规则相对清晰
  • 风险可控
  • 结果可验证

比如:

  • K8s 排障
  • 日志检索与摘要
  • SQL 只读分析
  • 工单归类

第二步:补齐治理能力

至少补上:

  • 参数校验
  • 结构化输出
  • 审计
  • 超时与幂等

第三步:引入 Registry 和灰度

当 Skill 数量超过 10 到 20 个,建议尽快进入资产化管理阶段。

第四步:做平台化与团队化分工

最终形成:

  • 平台团队负责编排、治理、观测
  • 领域团队负责 Skill 内容与脚本
  • 安全团队负责策略与审批

这才是企业里可长期持续的协作模式。

十七、结语:专属 AI 架构师的本质,是把专家经验编译成可治理的运行时能力

从架构视角看,Skill 的意义远不止“让模型更会调用工具”。

它真正改变的是三件事:

  1. 把分散在人脑、Wiki、脚本、Runbook 里的知识,沉淀成标准能力包
  2. 把一次性的 Prompt 工程,升级成可版本化、可审计、可扩展的软件资产
  3. 把 AI 从“会聊天的助手”,推进到“可控的企业执行系统”

如果要用一句话概括本文的核心结论,那就是:

企业级 Skill 架构的关键,不在于你接了哪个模型,而在于你是否构建了受限推理、稳定编排、细粒度权限、可观测执行和可回滚治理这一整套运行时体系。

当你把这些能力补齐之后,所谓“专属 AI 架构师”就不再是一个营销概念,而会真正成为企业里的下一代能力编排中枢。

附:企业级 Skill 设计检查清单

上线前,建议逐条检查:

  • description 是否覆盖高频关键词与自然语言描述
  • 是否定义了明确的输入 schema 与输出 schema
  • 是否有 Allow / Ask / Deny 权限策略
  • 是否声明了 owner、风险等级、兼容版本、环境范围
  • 是否支持超时、重试、幂等、限流
  • 是否有结构化事件、指标、Trace
  • 是否支持灰度、回滚、弃用
  • 是否有回放测试与历史案例验证
  • 是否明确哪些步骤可并行、哪些步骤必须审批
  • 是否把变更类 Skill 与查询类 Skill 区分治理

把这份清单做实,Skill 才能从“能跑”走到“能上线、能放量、能长期维护”。




上一篇:AMD Zen7架构前瞻:16核CCD、FOPLP面板级封装与32核桌面CPU路线图
下一篇:Python + Ollama 实践:打造生产级本地 AI 搜索引擎
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-5-26 05:52 , Processed in 0.627034 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表