不只讲“Skill 是什么”,而是完整回答四个工程问题:Skill 如何被召回、如何被编排、如何在高并发下稳定执行、如何在企业环境中安全演进。
一、先把问题说透:为什么企业一定会走到 Skill 架构
很多团队第一次接触 AI Agent,做法都很相似:
- 给大模型一段很长的 System Prompt
- 挂上几十个工具
- 让模型自己决定什么时候调用什么工具
Demo 阶段这套方案往往很顺,但一到生产环境就开始失控:
- 工具越来越多,模型选错工具的概率上升
- 指令越来越长,Token 成本和延迟快速恶化
- 同一个问题不同人问,执行链路不稳定
- 安全边界模糊,敏感命令和高危接口无法精细管控
- 故障场景下没有审计、没有回放、没有幂等
本质上,问题不在模型“变笨了”,而在于系统把所有知识、工具、流程、约束都塞进了一个巨型上下文里。
这时候就需要引入一个中间层:
Skill = 领域知识 + 工具集合 + 参数约束 + 执行策略 + 输出契约
Skill 不是简单的提示词模板,也不是单纯的函数调用描述。
它更像是一个面向 Agent 运行时的“领域能力包”。
对于企业来说,Skill 架构解决的是四个核心矛盾:
| 矛盾 |
传统 Prompt/Tool 方案 |
Skill 化之后 |
| 知识与执行耦合 |
所有知识堆在一个 Prompt 中 |
按领域拆分,按需注入 |
| 工具增长失控 |
工具越多越难选 |
先选 Skill,再在 Skill 内选工具 |
| 安全边界不清 |
所有工具平铺给模型 |
按 Skill 建立最小权限 |
| 工程治理困难 |
无版本、无审计、无回滚 |
有注册中心、版本、灰度、观测 |
一句话总结:
Skill 是企业把“人类专家的工作方式”结构化、产品化、可编排化的过程。
二、Skill 的底层原理:它不是 Prompt 技巧,而是受限推理空间
2.1 从 ReAct 到受限推理
大多数 Agent 的底层都是 ReAct:
Observe -> Think -> Act -> Observe -> Think -> Act
问题在于,原始 ReAct 把所有推理都放在同一个上下文窗口里。
当工具数从 5 个增加到 50 个、调用链从 2 步增加到 10 步后,会出现三个问题:
- 模型需要在过大的动作空间中决策
- 每一轮都要重复消费大量上下文
- 越往后推理,越容易偏离原始约束
Skill 的真正价值,是把“大而全的推理空间”切成多个“小而准的推理空间”。
用户请求
-> Skill Recall
-> 命中若干候选 Skill
-> 在候选 Skill 的受限上下文中规划
-> 生成结构化执行计划
-> 执行器落地执行
所以 Skill 的架构意义,不是“给模型更多提示”,而是:
- 缩小搜索空间
- 提升规划稳定性
- 降低上下文噪声
- 让运行时具备治理抓手
2.2 Skill 在系统中的角色
一个成熟的 Agent 平台,至少存在五层:
┌─────────────────────────────────────────────┐
│ Interaction Layer │
│ Chat / API / Workflow / SSE / Webhook │
├─────────────────────────────────────────────┤
│ Orchestration Layer │
│ Recall / Planner / Scheduler / Memory │
├─────────────────────────────────────────────┤
│ Skill Layer │
│ Domain Context / Tool Set / Constraints │
├─────────────────────────────────────────────┤
│ Execution Layer │
│ HTTP / gRPC / Script / Wasm / Browser │
├─────────────────────────────────────────────┤
│ Governance Layer │
│ Auth / Audit / Quota / Trace / Policy │
└─────────────────────────────────────────────┘
Skill 位于编排层和执行层之间,它承担两个职责:
- 对上,为 Planner 提供领域能力边界
- 对下,为 Executor 提供可执行契约
2.3 Skill、Plugin、MCP、Workflow 的边界
很多团队一开始会把这些概念混用,后面就会越做越乱。
| 能力形态 |
本质 |
适合场景 |
| Skill |
领域能力包 |
K8s 排障、订单风控、客服赔付 |
| Plugin |
平台钩子扩展 |
审计、日志注入、Prompt 变换 |
| MCP Server |
工具提供者 |
浏览器、数据库、IDE、文档系统 |
| Workflow |
固定流程编排 |
审批、批处理、日报生成 |
简单判断规则:
- 需要“领域知识 + 工具约束 + 输出规范”,选 Skill
- 需要“平台级横切逻辑”,选 Plugin
- 需要“连接外部系统”,选 MCP
- 需要“稳定且固定的流程链路”,选 Workflow
三、企业级 Skill 架构演进:从本地目录到分布式注册中心
3.1 三个阶段的演进路径
阶段一:单项目内嵌 Skill
.opencode/skills/*/SKILL.md
适用于:
优点是简单,缺点是没有统一治理能力。
阶段二:团队级集中管理
Git Repo / Internal Package / HTTP Pull
适用于:
- 多项目共享 Skill
- 需要版本管理
- 开始有审批和回滚需求
这个阶段的核心变化是:Skill 从“文件”升级为“可发布资产”。
阶段三:企业级 Skill Registry
┌────────────────────┐
│ Skill Registry │
│ Metadata + Content │
└─────────┬──────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Team A │ │ Team B │ │ Team C │
│ Publisher │ │ Publisher │ │ Publisher │
└────────────┘ └────────────┘ └────────────┘
│
▼
┌────────────────────┐
│ Agent Orchestrator │
└────────────────────┘
适用于:
- 50+ Skill
- 多团队协作
- 多租户、多环境、多集群
- 对审计、权限、灰度、可观测有明确要求
3.2 为什么必须有 Registry
当 Skill 数量一上来,没有 Registry 很快会出现四类问题:
- 不知道谁在维护这个 Skill
- 不知道哪个版本在生产上生效
- 不知道召回率为什么下降
- 不知道某次故障是不是由 Skill 变更引起
Registry 解决的是“资产治理”问题,而不是“文件存储”问题。
一个生产级 Skill Registry 至少要提供:
- 元数据管理:名称、版本、标签、所属团队、兼容环境
- 内容管理:Skill 主体、脚本、模板、依赖声明
- 检索接口:关键词检索、语义检索、标签过滤
- 发布治理:审批、灰度、回滚、弃用
- 事件通知:变更推送、缓存失效、审计记录
四、统一案例:构建企业级 Kubernetes 排障 Skill
为了避免文章只停留在抽象层,下面统一用一个真实又典型的场景贯穿全文:
团队要建设一个“专属 AI 架构师”,帮助值班工程师排查 K8s 集群中的 Pod CrashLoopBackOff、OOMKilled、节点压力、网络异常和服务抖动问题。
这个场景非常适合 Skill 化,原因有三点:
- 排障流程高度专业,依赖经验
- 工具链丰富但风险高,涉及
kubectl、日志、指标、变更操作
- 结果需要结构化沉淀,便于复盘和审计
4.1 Skill 目录结构
.opencode/skills/k8s-troubleshoot/
├── SKILL.md
├── scripts/
│ ├── diagnose_pod.sh
│ ├── inspect_node.sh
│ └── query_prometheus.py
├── templates/
│ ├── incident_summary.md
│ └── action_plan.md
└── policy/
└── permissions.json
4.2 生产级 SKILL.md
---
name: k8s-troubleshoot
description: >
Use when investigating Kubernetes cluster incidents including
CrashLoopBackOff, OOMKilled, Pending, Evicted, ImagePullBackOff,
service latency, node pressure, network policy, DNS resolution,
ingress errors, and Prometheus metric anomalies.
metadata:
domain: infrastructure
owner: sre-platform
compatibility: "k8s>=1.27"
risk_level: medium
environments: ["dev", "test", "prod"]
approval: "required-on-prod-mutation"
---
# K8s 生产排障 Skill
## Objective
快速完成以下工作:
1. 归类故障范围:pod / node / network / storage / config
2. 采集证据:events / describe / logs / metrics
3. 输出根因假设与置信度
4. 生成可执行修复建议
5. 在生产环境下禁止直接执行高危命令
## Readonly Tools
- `bash scripts/diagnose_pod.sh <namespace> <pod>`
- `bash scripts/inspect_node.sh <node>`
- `python3 scripts/query_prometheus.py --query "<promql>"`
## Mandatory Workflow
1. 先看 recent events,再看 describe,再看 logs,最后看 metrics
2. 如果发现 `OOMKilled`,必须同时检查 limit/request 与节点内存压力
3. 如果发现 `CrashLoopBackOff`,必须回看 previous logs
4. 如果问题发生在 prod,所有变更类操作只给建议,不自动执行
## Output Contract
最终输出必须包含以下字段:
```json
{
"classification": "pod|node|network|storage|config",
"severity": "P0|P1|P2|P3",
"summary": "一句话总结",
"evidence": [
{"type": "event", "content": "..."},
{"type": "log", "content": "..."},
{"type": "metric", "content": "..."}
],
"root_cause": {
"hypothesis": "最可能根因",
"confidence": 0.0
},
"action_plan": [
{
"step": 1,
"command": "建议命令",
"risk": "low|medium|high",
"verification": "验证方式"
}
]
}
这段 SKILL.md 有几个生产化要点:
- `description` 同时覆盖了语义描述和高频关键词
- `metadata` 显式声明 owner、环境、风险等级
- `Mandatory Workflow` 固化了人类专家的排障顺序
- `Output Contract` 强制结构化,避免模型自由发挥
## 五、运行时架构:一次请求到底是怎么流转的
### 5.1 请求链路全景
User Request
│
▼
API Gateway
│
▼
Conversation Service
│
├── Load Session
├── Recall Candidate Skills
├── Build Planning Context
▼
Planner / LLM Gateway
│
├── Return Plan
├── Select Skills
└── Select Tool Calls
▼
Execution Orchestrator
│
├── Policy Check
├── Rate Limit
├── Parallel Execution
├── Retry / Timeout / Circuit Breaker
▼
Skill Executor Pool
│
├── Bash Runner
├── HTTP Runner
├── gRPC Runner
└── Wasm Runner
▼
Observation + Audit + Event Store
│
▼
Response / SSE Stream
### 5.2 四个关键子系统
**1. Recall Engine**
负责从大量 Skill 中找出最可能命中的候选集合。
生产环境一般采用“双路召回”:
- 关键词召回:高精度、低成本
- 向量召回:解决自然语言表达不一致问题
实际工程中,不建议直接把所有 Skill 内容都做 embedding。
更常见的做法是:
- `name + description + tags + metadata` 做检索向量
- `body` 只在命中后懒加载
**2. Planner**
负责把用户问题转成执行计划,而不是直接发命令。
一个成熟的 Planner 输出应该至少包含:
- 目标任务
- 使用哪些 Skill
- 调用顺序
- 哪些步骤可并行
- 哪些步骤需要人工确认
**3. Orchestrator**
这是工程成败的关键。
真正的“企业级”不在 Prompt,而在编排器。
Orchestrator 至少要负责:
- 执行顺序控制
- 任务并发调度
- 限流与隔离
- 超时、重试、熔断
- 幂等与去重
- 中间结果聚合
- SSE 进度推送
**4. Executor**
Executor 不应该只有一个“运行命令”的粗暴实现,而应该有多种 Runner:
- HTTP Runner:调用内部微服务
- gRPC Runner:低延迟内部 RPC
- Script Runner:执行现有脚本
- Wasm Runner:轻量沙箱
- Browser Runner:网页自动化
不同 Runner 的安全模型和资源模型完全不同,必须分开治理。
## 六、核心数据模型:生产系统先定义边界,再写代码
下面给出一个可直接落地的 Skill 定义模型。
### 6.1 SkillDefinition
```go
package skill
type Definition struct {
Name string `json:"name"`
Version string `json:"version"`
Description string `json:"description"`
Tags []string `json:"tags"`
Metadata map[string]string `json:"metadata"`
Parameters JSONSchema `json:"parameters"`
Executor ExecutorSpec `json:"executor"`
Policy PolicySpec `json:"policy"`
OutputSchema JSONSchema `json:"output_schema"`
}
type ExecutorSpec struct {
Type string `json:"type"`
Endpoint string `json:"endpoint"`
TimeoutMs int `json:"timeout_ms"`
MaxConcurrency int `json:"max_concurrency"`
Retry RetrySpec `json:"retry"`
Headers map[string]string `json:"headers"`
}
type RetrySpec struct {
MaxAttempts int `json:"max_attempts"`
BaseDelayMs int `json:"base_delay_ms"`
}
type PolicySpec struct {
RiskLevel string `json:"risk_level"`
RequiredRoles []string `json:"required_roles"`
AllowedEnv []string `json:"allowed_env"`
NeedApproval bool `json:"need_approval"`
Idempotent bool `json:"idempotent"`
AllowParallel bool `json:"allow_parallel"`
Mutable bool `json:"mutable"`
}
这个模型最重要的不是字段多,而是它明确区分了三件事:
- 业务语义:
description、tags
- 执行契约:
parameters、executor
- 治理约束:
policy
6.2 ExecutionPlan
package orchestration
type Plan struct {
RequestID string `json:"request_id"`
Steps []PlanStep `json:"steps"`
}
type PlanStep struct {
StepID string `json:"step_id"`
SkillName string `json:"skill_name"`
Arguments map[string]any `json:"arguments"`
DependsOn []string `json:"depends_on"`
ParallelGroup string `json:"parallel_group"`
NeedApproval bool `json:"need_approval"`
TimeoutMs int `json:"timeout_ms"`
}
这一步很关键。
如果你的系统没有显式 ExecutionPlan,大概率后面很难做:
七、生产级代码实战:Skill Registry、编排器、执行器
下面的代码不追求 Demo 式“最短”,而是尽量接近生产设计。
7.1 Skill Registry:版本化发布与读取
package registry
import (
"context"
"fmt"
"sync"
"github.com/hashicorp/go-version"
)
type Store interface {
Save(ctx context.Context, def Definition) error
Get(ctx context.Context, name, ver string) (Definition, error)
List(ctx context.Context, filter Filter) ([]Definition, error)
}
type Service struct {
store Store
cache sync.Map
}
type Filter struct {
Query string
Tags []string
Env string
}
func (s *Service) Publish(ctx context.Context, def Definition) error {
if def.Name == "" || def.Version == "" {
return fmt.Errorf("name/version required")
}
if _, err := version.NewVersion(def.Version); err != nil {
return fmt.Errorf("invalid semver: %w", err)
}
if def.Executor.TimeoutMs <= 0 {
def.Executor.TimeoutMs = 3000
}
return s.store.Save(ctx, def)
}
func (s *Service) Search(ctx context.Context, filter Filter) ([]Definition, error) {
return s.store.List(ctx, filter)
}
生产建议:
Publish 不只是保存,还要触发 lint、policy 校验、兼容性检查
- Registry 中应保留 immutable version,不直接覆盖
- “当前生效版本”建议通过 alias 或 release channel 管理
7.2 Recall:关键词 + 语义双路召回
package recall
import "sort"
type Candidate struct {
Skill Definition
Score float64
}
type Engine struct {
keyword KeywordRetriever
vector VectorRetriever
}
func (e *Engine) Recall(query string, topN int) []Candidate {
kw := e.keyword.Search(query, topN*2)
vec := e.vector.Search(query, topN*2)
merged := make(map[string]Candidate)
merge := func(items []Candidate, weight float64) {
for _, item := range items {
current := merged[item.Skill.Name]
current.Skill = item.Skill
current.Score += item.Score * weight
merged[item.Skill.Name] = current
}
}
merge(kw, 0.45)
merge(vec, 0.55)
result := make([]Candidate, 0, len(merged))
for _, item := range merged {
result = append(result, item)
}
sort.Slice(result, func(i, j int) bool {
return result[i].Score > result[j].Score
})
if len(result) > topN {
result = result[:topN]
}
return result
}
这里的经验点有两个:
- 关键词召回用于“强约束词”,比如
OOMKilled、CrashLoopBackOff
- 向量召回用于“自然语言变体”,比如“服务老是抖”“节点资源吃满了”
7.3 Orchestrator:并发编排与失败控制
package orchestration
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
type Executor interface {
Execute(ctx context.Context, step PlanStep) (StepResult, error)
}
type EventSink interface {
Publish(ctx context.Context, event Event)
}
type Orchestrator struct {
executor Executor
events EventSink
globalSemaphore *semaphore.Weighted
}
type StepResult struct {
StepID string `json:"step_id"`
Success bool `json:"success"`
Output map[string]any `json:"output"`
ErrorMsg string `json:"error_msg,omitempty"`
}
func NewOrchestrator(executor Executor, sink EventSink, maxConcurrent int64) *Orchestrator {
return &Orchestrator{
executor: executor,
events: sink,
globalSemaphore: semaphore.NewWeighted(maxConcurrent),
}
}
func (o *Orchestrator) Run(ctx context.Context, plan Plan) ([]StepResult, error) {
results := make([]StepResult, 0, len(plan.Steps))
resultsMu := sync.Mutex{}
groups := groupByParallel(plan.Steps)
for _, steps := range groups {
eg, groupCtx := errgroup.WithContext(ctx)
for _, step := range steps {
step := step
eg.Go(func() error {
if err := o.globalSemaphore.Acquire(groupCtx, 1); err != nil {
return err
}
defer o.globalSemaphore.Release(1)
timeout := time.Duration(step.TimeoutMs) * time.Millisecond
if timeout == 0 {
timeout = 5 * time.Second
}
stepCtx, cancel := context.WithTimeout(groupCtx, timeout)
defer cancel()
o.events.Publish(stepCtx, Event{
Type: "step_started",
StepID: step.StepID,
})
output, err := o.executor.Execute(stepCtx, step)
if err != nil {
res := StepResult{
StepID: step.StepID,
Success: false,
ErrorMsg: err.Error(),
}
resultsMu.Lock()
results = append(results, res)
resultsMu.Unlock()
o.events.Publish(stepCtx, Event{
Type: "step_failed",
StepID: step.StepID,
Error: err.Error(),
})
return fmt.Errorf("step %s failed: %w", step.StepID, err)
}
output.Success = true
resultsMu.Lock()
results = append(results, output)
resultsMu.Unlock()
o.events.Publish(stepCtx, Event{
Type: "step_finished",
StepID: step.StepID,
})
return nil
})
}
if err := eg.Wait(); err != nil {
return results, err
}
}
return results, nil
}
这段代码体现了几个生产特征:
- 用
semaphore 做全局并发闸门,避免 Executor 被打爆
- 每一步独立超时,避免单点步骤拖垮整个链路
- 通过
EventSink 把进度推到 SSE、Kafka、审计系统
- 并行组执行完后再进入下一组,便于做依赖控制
7.4 Executor:参数校验、幂等、策略控制
package executor
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
type PolicyChecker interface {
Check(ctx context.Context, step PlanStep) error
}
type IdempotencyStore interface {
Get(ctx context.Context, key string) ([]byte, bool, error)
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}
type Service struct {
policy PolicyChecker
store IdempotencyStore
client *http.Client
}
func (s *Service) Execute(ctx context.Context, step PlanStep) (StepResult, error) {
if err := s.policy.Check(ctx, step); err != nil {
return StepResult{}, err
}
cacheKey := buildIdempotencyKey(step)
if raw, ok, _ := s.store.Get(ctx, cacheKey); ok {
var cached StepResult
if err := json.Unmarshal(raw, &cached); err == nil {
return cached, nil
}
}
body, _ := json.Marshal(step.Arguments)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, resolveEndpoint(step.SkillName), bytes.NewReader(body))
if err != nil {
return StepResult{}, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Request-ID", step.StepID)
resp, err := s.client.Do(req)
if err != nil {
return StepResult{}, err
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
return StepResult{}, fmt.Errorf("executor upstream 5xx: %d", resp.StatusCode)
}
if resp.StatusCode >= 400 {
return StepResult{}, fmt.Errorf("executor upstream 4xx: %d", resp.StatusCode)
}
var result StepResult
if err := json.NewDecoder(resp.Body).Decode(&result.Output); err != nil {
return StepResult{}, err
}
result.StepID = step.StepID
raw, _ := json.Marshal(result)
_ = s.store.Set(ctx, cacheKey, raw, 10*time.Minute)
return result, nil
}
这一步常被低估,但它决定系统是否能扛线上重试风暴:
- 用户重复提交
- 网关重试
- LLM 重新规划重复步骤
- 下游超时但实际上已成功
没有幂等,就没有生产级 Agent。
八、高并发与可扩展:企业级系统真正拉开差距的地方
很多文章讲 Skill,只讲“怎么定义”,不讲“怎么扛流量”。
但企业系统一旦接入客服、运维、风控、运营平台,流量模型会比想象中复杂得多。
8.1 高并发下的四类瓶颈
1. LLM 瓶颈
- 外部 API 延迟高
- token 生成速度不稳定
- 厂商限流
- 模型实例负载抖动
2. Skill Executor 瓶颈
- 脚本执行耗 CPU
kubectl、数据库、搜索接口等下游资源有限
- 高危操作需要审批,阻塞链路
3. Session 瓶颈
- 长会话上下文快速膨胀
- Redis 热 key
- 大对象序列化压力
4. 流式连接瓶颈
- SSE 长连接占资源
- 前端断连重试放大流量
- 中间层代理的连接超时配置不当
8.2 高并发设计原则
原则一:先把规划和执行解耦
不要把“模型规划”与“技能执行”写成同步串行黑盒。
应拆成两个阶段:
Planning Phase -> Execution Phase
好处:
- 可以缓存 Planning 结果
- 可以单独优化 LLM 调用
- 可以把执行部分异步化
- 可以对计划做审批、风控、重放
原则二:区分同步 Skill 和异步 Skill
| 类型 |
示例 |
处理方式 |
| 同步 Skill |
查订单、查 Pod 状态 |
直接返回 |
| 准同步 Skill |
触发扩容、创建工单 |
接受后异步跟踪 |
| 异步 Skill |
批量分析日志、生成日报 |
投递 MQ,事件回传 |
如果系统把所有 Skill 都放在同步链路里,迟早被长尾任务拖死。
原则三:按 Skill 维度做隔离
不能只做“全局限流”,还要做“单 Skill 隔离”。
比如:
k8s-troubleshoot 最大并发 100
billing-refund 最大并发 20
browser-automation 最大并发 10
原因很简单:不同 Skill 的资源模型完全不同。
8.3 单 Skill 并发控制
package control
import (
"context"
"sync"
"golang.org/x/sync/semaphore"
)
type SkillLimiter struct {
mu sync.RWMutex
limiters map[string]*semaphore.Weighted
}
func NewSkillLimiter() *SkillLimiter {
return &SkillLimiter{
limiters: make(map[string]*semaphore.Weighted),
}
}
func (s *SkillLimiter) Register(skill string, max int64) {
s.mu.Lock()
defer s.mu.Unlock()
s.limiters[skill] = semaphore.NewWeighted(max)
}
func (s *SkillLimiter) Acquire(ctx context.Context, skill string) error {
s.mu.RLock()
sem := s.limiters[skill]
s.mu.RUnlock()
if sem == nil {
return nil
}
return sem.Acquire(ctx, 1)
}
func (s *SkillLimiter) Release(skill string) {
s.mu.RLock()
sem := s.limiters[skill]
s.mu.RUnlock()
if sem != nil {
sem.Release(1)
}
}
这是企业级常见做法:全局限流 + 租户限流 + Skill 限流三层叠加。
8.4 背压与降级
当系统流量超过承载能力时,不能只会报错,应该有明确降级策略:
- 降低召回数量:Top 8 降到 Top 3
- 关闭昂贵模型:从高阶模型降到轻量模型
- 禁止非必要并行:串行执行可选步骤
- 长任务转异步:返回 task_id,前端轮询或订阅
- 命中缓存直接返回:尤其是查询类 Skill
8.5 缓存策略
企业级 Skill 平台至少要有四层缓存思维:
| 层级 |
缓存内容 |
价值 |
| Recall Cache |
query -> candidates |
减少检索开销 |
| Plan Cache |
normalized request -> execution plan |
减少 LLM 调用 |
| Result Cache |
idempotent skill output |
降低下游压力 |
| Metadata Cache |
skill definitions |
降低 Registry 压力 |
但要注意:
- 查询型 Skill 可以强缓存
- 变更型 Skill 只能做幂等缓存,不能做结果复用
九、安全治理:没有权限边界的 Agent 一定不能上生产
企业最担心的从来不是“模型答错”,而是“模型做错”。
Skill 平台的安全治理至少要覆盖五层。
9.1 参数层:JSON Schema 校验
所有入参必须经过 schema 校验,不能让模型自由拼装。
例如:
{
"type": "object",
"properties": {
"namespace": {
"type": "string",
"pattern": "^[a-z0-9-]+$"
},
"pod": {
"type": "string",
"maxLength": 128
}
},
"required": ["namespace", "pod"],
"additionalProperties": false
}
9.2 策略层:Allow / Ask / Deny
{
"readonly": [
"kubectl get *",
"kubectl describe *",
"kubectl logs *"
],
"ask": [
"kubectl exec *",
"kubectl rollout restart *"
],
"deny": [
"kubectl delete ns *",
"kubectl delete node *",
"kubectl * --all"
]
}
9.3 身份层:RBAC + 环境隔离
Skill 权限不应该只取决于“模型想不想执行”,而是取决于:
- 当前用户身份
- 当前租户
- 当前环境
- 当前 Skill 风险等级
最常见的规则是:
dev 可自动执行低风险变更
test 允许部分中风险变更
prod 默认只读,变更必须审批
9.4 执行层:沙箱与网络隔离
对于脚本型 Skill,至少做到:
- 独立容器或 Wasm 沙箱
- 只读文件系统
- CPU/内存配额
- 出网白名单
- 临时目录生命周期隔离
9.5 审计层:谁触发、触发了什么、结果是什么
审计记录至少包含:
- 用户 ID
- 会话 ID
- 请求原文
- 命中的 Skill
- 每一步参数
- 每一步执行结果
- 是否人工确认
- 最终输出
这不仅是为了安全,也是为了事后复盘和模型优化。
十、可观测性:不观测就无法优化,不可回放就无法复盘
10.1 必做指标
Skill 平台建议至少暴露以下指标:
skill_recall_total{skill, result}
skill_recall_latency_ms{skill}
planner_latency_ms{model}
skill_execution_total{skill, status}
skill_execution_latency_ms{skill}
skill_execution_timeout_total{skill}
skill_policy_denied_total{skill}
skill_approval_wait_ms{skill}
session_context_tokens{tenant}
10.2 Trace 设计
一条完整链路至少有这些 Span:
request
├── load_session
├── recall_skills
├── planner_llm
├── execute_step:diagnose-pod
├── execute_step:query-prometheus
├── summarize_llm
└── stream_response
10.3 结构化事件
type Event struct {
RequestID string `json:"request_id"`
SessionID string `json:"session_id"`
Type string `json:"type"`
StepID string `json:"step_id,omitempty"`
SkillName string `json:"skill_name,omitempty"`
Payload map[string]any `json:"payload,omitempty"`
Error string `json:"error,omitempty"`
At time.Time `json:"at"`
}
这类事件流很有价值,可以同时服务于:
- 前端实时展示
- Kafka 异步消费
- 审计落库
- 离线分析召回率和失败率
十一、真实场景复盘:一次 CrashLoopBackOff 故障是如何被 Skill 体系处理的
假设用户输入:
帮我排查 prod 命名空间 payment-service-7d8c9d4f6f-2lqpx 为什么一直 CrashLoopBackOff
11.1 Recall 阶段
系统可能召回以下候选 Skill:
| Skill |
分数 |
命中原因 |
k8s-troubleshoot |
0.96 |
包含 CrashLoopBackOff、prod、pod 排障语义 |
service-latency-analysis |
0.42 |
与 service 关键词相关 |
release-rollback |
0.18 |
与生产服务有关,但不够精准 |
最终主命中 k8s-troubleshoot。
11.2 Planner 输出计划
{
"request_id": "req-20260525-1001",
"steps": [
{
"step_id": "s1",
"skill_name": "k8s-get-events",
"arguments": {"namespace": "prod"},
"parallel_group": "g1"
},
{
"step_id": "s2",
"skill_name": "k8s-describe-pod",
"arguments": {
"namespace": "prod",
"pod": "payment-service-7d8c9d4f6f-2lqpx"
},
"parallel_group": "g1"
},
{
"step_id": "s3",
"skill_name": "k8s-get-previous-logs",
"arguments": {
"namespace": "prod",
"pod": "payment-service-7d8c9d4f6f-2lqpx"
},
"depends_on": ["s2"]
},
{
"step_id": "s4",
"skill_name": "promql-query",
"arguments": {
"query": "sum(container_memory_working_set_bytes{pod='payment-service-7d8c9d4f6f-2lqpx'})"
},
"parallel_group": "g2"
}
]
}
这份计划天然具备并行性:
s1 和 s2 可并发
s3 依赖 s2
s4 可和 s3 并行
11.3 Executor 采集到的关键信息
describe:
Last State: Terminated
Reason: OOMKilled
previous logs:
java.lang.OutOfMemoryError: Java heap space
metrics:
container_memory_working_set_bytes = 1.73Gi
pod memory limit = 1536Mi
11.4 Skill 最终结构化输出
{
"classification": "pod",
"severity": "P1",
"summary": "payment-service Pod 因内存超限被 kubelet 杀死,导致持续 CrashLoopBackOff",
"evidence": [
{
"type": "event",
"content": "Back-off restarting failed container"
},
{
"type": "log",
"content": "java.lang.OutOfMemoryError: Java heap space"
},
{
"type": "metric",
"content": "working_set_bytes(1.73Gi) > limit(1.5Gi)"
}
],
"root_cause": {
"hypothesis": "JVM 堆配置与容器 limit 不匹配,触发 OOMKilled",
"confidence": 0.94
},
"action_plan": [
{
"step": 1,
"command": "kubectl -n prod set resources deploy/payment-service --limits=memory=2Gi --requests=memory=1Gi",
"risk": "medium",
"verification": "kubectl -n prod rollout status deploy/payment-service"
},
{
"step": 2,
"command": "调整 JVM -Xmx,建议不超过容器 limit 的 60%-70%",
"risk": "low",
"verification": "观察 Pod 重启次数和 GC 指标"
}
]
}
这就是 Skill 架构相较于简单问答的价值:
- 不只给结论
- 还给证据
- 还给行动方案
- 还能控制风险和审批边界
十二、从单体 Skill 到企业级 Skill 平台:完整部署拓扑
12.1 推荐服务拆分
agent-gateway
conversation-service
planner-service
skill-registry
skill-executor-http
skill-executor-script
skill-executor-browser
policy-service
audit-service
event-bus
为什么建议把 Executor 再拆细?
- HTTP/gRPC 型 Skill 主要吃网络与连接池
- Script/Wasm 型 Skill 主要吃 CPU 与隔离资源
- Browser 型 Skill 主要吃内存与实例数
混在一起,调度和容量都很难做。
12.2 Kubernetes 部署建议
apiVersion: apps/v1
kind: Deployment
metadata:
name: skill-executor-script
spec:
replicas: 4
selector:
matchLabels:
app: skill-executor-script
template:
metadata:
labels:
app: skill-executor-script
spec:
containers:
- name: app
image: registry.example.com/skill-executor-script:1.0.0
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2"
memory: "2Gi"
env:
- name: EXECUTOR_MAX_CONCURRENCY
value: "50"
- name: POLICY_ENDPOINT
value: "http://policy-service.default.svc.cluster.local"
readinessProbe:
httpGet:
path: /readyz
port: 8080
livenessProbe:
httpGet:
path: /livez
port: 8080
12.3 HPA 关注指标
Skill 平台不建议只看 CPU 自动扩容,还要看:
- 正在执行的 step 数
- 等待队列长度
- P95 执行时延
- SSE 活跃连接数
- 下游错误率
十三、工程治理:版本、灰度、回滚、兼容性
13.1 Skill 版本管理建议
使用语义化版本:
MAJOR:输入输出契约变化,不兼容
MINOR:新增能力,兼容
PATCH:修复文案、策略、脚本细节
13.2 灰度发布
生产上不要直接全量替换 Skill。
推荐做法:
- 发布新版本到 Registry
- 标记为
candidate
- 仅对测试租户或 5% 流量生效
- 观测召回率、成功率、耗时、人工接管率
- 没问题后切到
stable
13.3 回滚
Skill 回滚要保证两个层面都能回:
所以 Skill 发布最好是“不可变包”,而不是简单覆盖文件。
13.4 兼容性约束
一个企业级 Skill 很容易依赖环境差异:
- K8s 版本不同
- CLI 参数不同
- API 返回结构不同
- 监控系统指标名不同
因此建议在 metadata 中显式标明兼容矩阵,例如:
metadata:
compatibility:
kubernetes: ">=1.27,<1.31"
prometheus: ">=2.45"
cni: ["calico", "cilium"]
十四、质量保障:测试不是补充项,而是 Skill 能否长期演进的前提
14.1 四层测试模型
1. Lint 测试
检查:
- frontmatter 是否完整
- description 是否过短
- output schema 是否存在
- policy 是否缺失
2. Contract 测试
验证:
- 参数 schema 是否正确
- 输出是否符合约束
- 危险命令是否被拦截
3. Replay 测试
把历史真实请求回放到新版本 Skill,观察:
4. Chaos/Failure 测试
模拟:
- Registry 不可用
- LLM 429/5xx
- Executor 超时
- Redis 抖动
- 审批服务不可用
14.2 一个简单的 Skill Lint 示例
package lint
import "fmt"
func Validate(def Definition) error {
if len(def.Description) < 40 {
return fmt.Errorf("description too short")
}
if def.OutputSchema.Type == "" {
return fmt.Errorf("output schema required")
}
if def.Policy.RiskLevel == "" {
return fmt.Errorf("policy.risk_level required")
}
if def.Executor.TimeoutMs <= 0 || def.Executor.TimeoutMs > 30000 {
return fmt.Errorf("invalid executor timeout")
}
return nil
}
十五、常见反模式:很多团队不是做不出来,而是做偏了
反模式一:把 Skill 写成一篇超长说明书
后果:
正确做法:
- 把长知识拆到模板、脚本、引用文档
- Skill 主体只保留“何时用、怎么做、输出什么”
反模式二:只做工具包装,不做领域约束
如果 Skill 只是把 kubectl 或内部 API 重新描述一遍,它还不是 Skill,只是工具清单。
真正的 Skill 要体现:
反模式三:把所有步骤都交给模型临场发挥
生产环境中,越关键的步骤越要结构化。
模型应该负责“规划与归纳”,而不是每一步都自由 improvisation。
反模式四:没有显式审批链
高风险 Skill 如果没有审批机制,最终不是平台下线,就是被安全部门一票否决。
十六、落地路线图:企业该怎么分阶段建设
如果你所在团队现在还没有 Skill 平台,不要一开始就上最复杂的分布式架构。
推荐按下面四步推进。
第一步:先做 3 到 5 个高价值 Skill
优先选择:
比如:
- K8s 排障
- 日志检索与摘要
- SQL 只读分析
- 工单归类
第二步:补齐治理能力
至少补上:
第三步:引入 Registry 和灰度
当 Skill 数量超过 10 到 20 个,建议尽快进入资产化管理阶段。
第四步:做平台化与团队化分工
最终形成:
- 平台团队负责编排、治理、观测
- 领域团队负责 Skill 内容与脚本
- 安全团队负责策略与审批
这才是企业里可长期持续的协作模式。
十七、结语:专属 AI 架构师的本质,是把专家经验编译成可治理的运行时能力
从架构视角看,Skill 的意义远不止“让模型更会调用工具”。
它真正改变的是三件事:
- 把分散在人脑、Wiki、脚本、Runbook 里的知识,沉淀成标准能力包
- 把一次性的 Prompt 工程,升级成可版本化、可审计、可扩展的软件资产
- 把 AI 从“会聊天的助手”,推进到“可控的企业执行系统”
如果要用一句话概括本文的核心结论,那就是:
企业级 Skill 架构的关键,不在于你接了哪个模型,而在于你是否构建了受限推理、稳定编排、细粒度权限、可观测执行和可回滚治理这一整套运行时体系。
当你把这些能力补齐之后,所谓“专属 AI 架构师”就不再是一个营销概念,而会真正成为企业里的下一代能力编排中枢。
附:企业级 Skill 设计检查清单
上线前,建议逐条检查:
description 是否覆盖高频关键词与自然语言描述
- 是否定义了明确的输入 schema 与输出 schema
- 是否有
Allow / Ask / Deny 权限策略
- 是否声明了 owner、风险等级、兼容版本、环境范围
- 是否支持超时、重试、幂等、限流
- 是否有结构化事件、指标、Trace
- 是否支持灰度、回滚、弃用
- 是否有回放测试与历史案例验证
- 是否明确哪些步骤可并行、哪些步骤必须审批
- 是否把变更类 Skill 与查询类 Skill 区分治理
把这份清单做实,Skill 才能从“能跑”走到“能上线、能放量、能长期维护”。