找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

431

积分

0

好友

61

主题
发表于 23 小时前 | 查看: 2| 回复: 0

Coze Studio 的插件系统是连接 AI 模型与外部服务的桥梁,允许 AI 通过 Function Calling 的方式调用外部工具和服务。插件系统本质上是对外部 API 的标准化封装,使 LLM 能够:

  • 🔍 获取实时数据:搜索、天气、新闻等
  • 🛠️ 执行操作:发送邮件、创建文件、远程命令执行等
  • 🔗 集成第三方服务:数据库、云服务、MCP 服务器等

理解插件的运转和执行流程,是为后续开发自定义或 MCP 插件奠定基础的关键。

本文将围绕 Coze Studio 插件系统的整体设计和其与 LLM 的协作关系,通过源码逐步深入分析。

注意:本文重点剖析 Coze 如何结合 HTTP 插件进行工作,作为理解其插件体系的切入点。MCP 和自定义插件的详细分析需自行探索。官方版本尚未完整实现 MCP 支持,后续文章将介绍如何进行插件二次开发,以支持在前端配置 MCP Server。

image-20251205224615981

核心概念

当前插件系统由以下几个核心部分构成:

image-20251202225023049

Plugin(插件)
  • 定义:一个插件代表一个外部服务或功能集合。
  • 组成:包含多个 Tool(工具/API)。
  • 元数据:包含名称、描述、图标、认证信息等。
Tool(工具/API)
  • 定义:插件中的一个具体功能接口。
  • 描述:基于 OpenAPI 3.0 规范定义。
  • 参数:支持 Header、Path、Query、Body 等多种参数位置。
Manifest(清单)
  • 定义:插件的配置清单,描述插件的元信息和 API 配置。
  • 内容:包含插件类型、认证方式、公共参数等。

架构分层设计

Coze Studio 插件系统延续了整个系统的设计理念,采用分层架构,并遵循 DDD(领域驱动设计) 原则:

图片

关键目录结构

backend/
├── api/                    # API 层
│   └── handler/           # HTTP 处理器
├── application/           # 应用层
│   └── plugin/           # 插件应用服务
├── domain/                # 领域层
│   └── plugin/
│       ├── entity/        # 实体定义
│       ├── service/       # 领域服务
│       │   └── tool/      # 工具执行实现
│       ├── repository/    # 仓储接口
│       └── conf/          # 配置加载
└── crossdomain/           # 跨域层
    └── plugin/            # 插件跨域接口

插件类型体系

目前官方版本尚未支持 MCP,后续文章将介绍如何基于 coze-studio 扩展支持 MCP Server 配置。本节主要以 HTTP 插件为例进行剖析,其原理具有代表性。

插件类型定义

Coze Studio 定义了三种插件类型,位于 backend/crossdomain/plugin/consts/consts.go

image-20251202225252362

插件类型对比

类型 常量值 描述 使用场景 执行方式
OpenAPI Plugin openapi 标准 HTTP RESTful API 传统 Web API 集成 HTTP 请求
MCP Plugin coze-studio-mcp Model Context Protocol 连接 MCP 服务器、实时数据源 MCP 协议调用
Custom Plugin coze-studio-custom 自定义内部逻辑 内置功能、特殊处理 注册的自定义处理器
OpenAPI Plugin(HTTP 插件)
  • 特点:基于 OpenAPI 3.0 规范。
  • 执行:通过 HTTP 客户端发送 RESTful 请求。
  • 认证:支持 OAuth、API Key、Service Token 等。
  • 适用:大多数第三方 Web API。
MCP Plugin(MCP 插件)
  • 特点:基于 Model Context Protocol。
  • 执行:通过 MCP 客户端调用工具。
  • 配置:在 Manifest 的 api.extensions.mcp_config 中配置。
  • 适用:需要实时连接、流式数据的场景。
Custom Plugin(自定义插件)
  • 特点:内部自定义逻辑。
  • 执行:通过注册的自定义处理器。
  • 注册:使用 tool.RegisterCustomTool() 注册。
  • 适用:系统内置功能、特殊业务逻辑。

后端插件加载流程

初始化流程

插件系统的初始化在应用启动时进行,流程如下:

图片

关键代码路径

image-20251205224920685

  1. 应用初始化backend/application/application.go::Init()
  2. 插件服务初始化backend/application/plugin/init.go::InitService()
  3. 配置初始化backend/domain/plugin/conf/config.go::InitConfig()
  4. 元数据加载backend/domain/plugin/conf/load_plugin.go::loadPluginProductMeta()

核心代码详解

1. 应用层初始化
// backend/application/application.go
func Init(ctx context.Context) (err error) {
    // ... 其他初始化 ...

    // 初始化主服务(包含插件服务)
    primaryServices, err := initPrimaryServices(ctx, basicServices)
    if err != nil {
        return fmt.Errorf("Init - initPrimaryServices failed, err: %v", err)
    }

    // ...
}

func initPrimaryServices(ctx context.Context, basicServices *basicServices) (*primaryServices, error) {
    // 初始化插件服务
    pluginSVC, err := plugin.InitService(ctx, basicServices.toPluginServiceComponents())
    if err != nil {
        return nil, err
    }

    // ...
    return &primaryServices{
        pluginSVC: pluginSVC,
        // ...
    }, nil
}
2. 插件服务初始化
// backend/application/plugin/init.go
func InitService(ctx context.Context, components *ServiceComponents) (*PluginApplicationService, error) {
    // 1. 初始化插件配置(加载产品元数据)
    err := conf.InitConfig(ctx)
    if err != nil {
        return nil, err
    }

    // 2. 创建仓储实现
    toolRepo := repository.NewToolRepo(&repository.ToolRepoComponents{
        IDGen: components.IDGen,
        DB:    components.DB,
    })

    pluginRepo := repository.NewPluginRepo(&repository.PluginRepoComponents{
        IDGen: components.IDGen,
        DB:    components.DB,
    })

    oauthRepo := repository.NewOAuthRepo(&repository.OAuthRepoComponents{
        IDGen: components.IDGen,
        DB:    components.DB,
    })

    // 3. 创建领域服务
    pluginSVC := service.NewService(&service.Components{
        IDGen:      components.IDGen,
        DB:         components.DB,
        OSS:        components.OSS,
        PluginRepo: pluginRepo,
        ToolRepo:   toolRepo,
        OAuthRepo:  oauthRepo,
    })

    // 4. 检查产品插件 ID 是否与数据库中的草稿插件冲突
    err = checkIDExist(ctx, pluginSVC)
    if err != nil {
        return nil, err
    }

    // 5. 组装应用服务
    PluginApplicationSVC.DomainSVC = pluginSVC
    PluginApplicationSVC.eventbus = components.EventBus
    PluginApplicationSVC.oss = components.OSS
    PluginApplicationSVC.userSVC = components.UserSVC
    PluginApplicationSVC.pluginRepo = pluginRepo
    PluginApplicationSVC.toolRepo = toolRepo

    return PluginApplicationSVC, nil
}

// 检查产品插件 ID 是否已存在于数据库中
func checkIDExist(ctx context.Context, pluginService service.PluginService) error {
    // 获取所有产品插件
    pluginProducts := conf.GetAllPluginProducts()

    pluginIDs := make([]int64, 0, len(pluginProducts))
    var toolIDs []int64
    for _, p := range pluginProducts {
        pluginIDs = append(pluginIDs, p.Info.ID)
        toolIDs = append(toolIDs, p.ToolIDs...)
    }

    // 检查插件 ID 是否冲突
    pluginInfos, err := pluginService.MGetDraftPlugins(ctx, pluginIDs)
    if err != nil {
        return err
    }
    if len(pluginInfos) > 0 {
        // 发现冲突,返回错误
        conflictsIDs := make([]int64, 0, len(pluginInfos))
        for _, p := range pluginInfos {
            conflictsIDs = append(conflictsIDs, p.ID)
        }
        return errorx.New(errno.ErrPluginIDExist, ...)
    }

    // 检查工具 ID 是否冲突
    tools, err := pluginService.MGetDraftTools(ctx, toolIDs)
    if err != nil {
        return err
    }
    if len(tools) > 0 {
        // 发现冲突,返回错误
        conflictsIDs := make([]int64, 0, len(tools))
        for _, t := range tools {
            conflictsIDs = append(conflictsIDs, t.ID)
        }
        return errorx.New(errno.ErrToolIDExist, ...)
    }

    return nil
}
3. 配置初始化
// backend/domain/plugin/conf/config.go
func InitConfig(ctx context.Context) (err error) {
    // 1. 获取当前工作目录
    cwd, err := os.Getwd()
    if err != nil {
        logs.Warnf("[InitConfig] Failed to get current working directory: %v", err)
        cwd = os.Getenv("PWD")
    }

    // 2. 构建插件配置路径
    basePath := path.Join(cwd, "resources", "conf", "plugin")
    logs.CtxInfof(ctx, "basePath=%s", basePath)

    // 3. 加载插件产品元数据
    err = loadPluginProductMeta(ctx, basePath)
    if err != nil {
        return err
    }

    // 4. 加载 OAuth Schema
    err = loadOAuthSchema(ctx, basePath)
    if err != nil {
        return err
    }

    return nil
}

插件产品元数据加载

插件产品元数据存储在 backend/conf/plugin/pluginproduct/ 目录下,采用 YAML 格式。

元数据结构

plugin_id: 1001                   # 插件 ID
deprecated: false                  # 是否已废弃
version: "1.0.0"                   # 版本号(需符合 semver)
plugin_type: PLUGIN                # 插件类型
openapi_doc_file: "weather.yaml"   # OpenAPI 文档文件名
manifest:                          # 插件清单
  api:
    type: "openapi"                # API 类型
    url: "https://api.example.com" # 服务器 URL
name_for_human: "天气插件"        # 显示名称
tools:                             # 工具列表
  - tool_id: 2001                  # 工具 ID
    deprecated: false              # 是否已废弃
    method: "GET"                  # HTTP 方法
    sub_url: "/weather"            # 子路径

加载流程

图片

关键代码:backend/domain/plugin/conf/load_plugin.go

func loadPluginProductMeta(ctx context.Context, basePath string) (err error) {
    // 1. 读取元数据文件
    metaFile := path.Join(root, "plugin_meta.yaml")
    file, err := os.ReadFile(metaFile)

    // 2. 解析 YAML
    var pluginsMeta []*pluginProductMeta
    err = yaml.Unmarshal(file, &pluginsMeta)

    // 3. 遍历每个插件元数据
    for _, m := range pluginsMeta {
        // 4. 检查元数据有效性
        if !checkPluginMetaInfo(ctx, m) {
            continue
        }

        // 5. 验证 Manifest
        err = m.Manifest.Validate(true)

        // 6. 加载 OpenAPI 文档
        docPath := path.Join(root, m.OpenapiDocFile)
        loader := openapi3.NewLoader()
        _doc, err := loader.LoadFromFile(docPath)

        // 7. 验证 OpenAPI 文档
        err = doc.Validate(ctx)

        // 8. 创建 PluginInfo
        pi := &PluginInfo{
            Info: &model.PluginInfo{
                ID:         m.PluginID,
                PluginType: m.PluginType,
                Version:    ptr.Of(m.Version),
                ServerURL:  ptr.Of(doc.Servers[0].URL),
                Manifest:   m.Manifest,
                OpenapiDoc: doc,
            },
            ToolIDs: make([]int64, 0, len(m.Tools)),
        }

        // 9. 解析 API 路径和方法
        apis := make(map[dto.UniqueToolAPI]*model.Openapi3Operation)
        for subURL, pathItem := range doc.Paths {
            for method, op := range pathItem.Operations() {
                api := dto.UniqueToolAPI{
                    SubURL: subURL,
                    Method: strings.ToUpper(method),
                }
                apis[api] = model.NewOpenapi3Operation(op)
            }
        }

        // 10. 匹配工具与 API
        for _, t := range m.Tools {
            api := dto.UniqueToolAPI{
                SubURL: t.SubURL,
                Method: strings.ToUpper(t.Method),
            }
            op, ok := apis[api]
            if !ok {
                continue
            }

            // 11. 创建 ToolInfo
            toolProducts[t.ToolID] = &ToolInfo{
                Info: &entity.ToolInfo{
                    ID:        t.ToolID,
                    PluginID:  m.PluginID,
                    Method:    ptr.Of(t.Method),
                    SubURL:    ptr.Of(t.SubURL),
                    Operation: op,
                },
            }

            pi.ToolIDs = append(pi.ToolIDs, t.ToolID)
        }

        // 12. 注册到内存缓存
        pluginProducts[m.PluginID] = pi
    }

    return nil
}

OpenAPI 文档解析

OpenAPI 文档解析使用 github.com/getkin/kin-openapi 库。

解析步骤

  1. 加载文档:从文件系统读取 YAML/JSON 格式的 OpenAPI 文档。
  2. 解析结构:解析为 openapi3.T 结构体。
  3. 验证规范:验证文档是否符合 OpenAPI 3.0 规范。
  4. 提取操作:提取每个路径的操作(GET、POST 等)。
  5. 构建 Schema:构建参数和响应的 Schema 定义。

关键数据结构

type PluginInfo struct {
    Info    *model.PluginInfo      // 插件基本信息
    ToolIDs []int64                // 工具 ID 列表
}
type ToolInfo struct {
    Info *entity.ToolInfo          // 工具信息
}
type ToolInfo struct {
    ID              int64
    PluginID        int64
    Version         *string
    Method          *string         // HTTP 方法
    SubURL          *string         // 子路径
    Operation       *model.Openapi3Operation  // OpenAPI 操作定义
    ActivatedStatus *int32
    DebugStatus     *int32
}

插件注册与缓存

插件加载完成后,会注册到内存缓存中。

缓存结构

var (
    pluginProducts map[int64]*PluginInfo  // 插件 ID -> PluginInfo
    toolProducts   map[int64]*ToolInfo    // 工具 ID -> ToolInfo
)

访问接口

// 获取插件产品信息
func GetPluginProduct(pluginID int64) (*PluginInfo, bool)
// 批量获取插件产品信息
func MGetPluginProducts(pluginIDs []int64) []*PluginInfo
// 获取工具产品信息
func GetToolProduct(toolID int64) (*ToolInfo, bool)
// 批量获取工具产品信息
func MGetToolProducts(toolIDs []int64) []*ToolInfo

特点

  • ✅ 线程安全:使用深拷贝返回,避免并发修改。
  • ✅ 快速访问:O(1) 时间复杂度。
  • ✅ 启动时加载:应用启动时一次性加载,运行时只读。

可以看到,当前 coze-studio 在服务启动时,会加载 coze-studio/backend/resources/conf/plugin/pluginproduct/plugin_meta.yaml 这个文件,内部解析后缓存到内存中。

插件执行流程

执行场景

插件可以在多种场景下执行,定义在 backend/crossdomain/plugin/consts/consts.go

type ExecuteScene string
const (
    ExecSceneOfOnlineAgent ExecuteScene = "online_agent"   // 在线 Agent
    ExecSceneOfDraftAgent  ExecuteScene = "draft_agent"    // 草稿 Agent
    ExecSceneOfWorkflow    ExecuteScene = "workflow"       // Workflow
    ExecSceneOfToolDebug   ExecuteScene = "tool_debug"     // 工具调试
)

StreamWriter/StreamReader 工作原理

理解执行流程前,先看 coze-studio 的流式输出机制:StreamWriter/StreamReader

Pipe 机制

StreamWriterStreamReader 通过 schema.Pipe() 创建,形成生产者-消费者通道:

sr, sw := schema.Pipe[*entity.Message](10)  // 缓冲区大小为 10

工作方式

图片

特性

  1. 线程安全:内部使用 channel,支持并发读写。
  2. 缓冲机制:缓冲区满时 Send 阻塞,空时 Recv 阻塞。
  3. 流式传输:逐条消息传输,无需等待完整结果。
  4. 错误传播Send(msg, err) 可传递错误,Recv() 返回该错误。

实现github.com/cloudwego/eino/schema/stream.go):

func Pipe[T any](cap int) (*StreamReader[T], *StreamWriter[T]) {
    stm := newStream[T](cap)
    return stm.asReader(), &StreamWriter[T]{stm: stm}
}
func (sw *StreamWriter[T]) Send(chunk T, err error) (closed bool) {
    return sw.stm.send(chunk, err)
}
func (sr *StreamReader[T]) Recv() (T, error) {
    return sr.stm.recv()
}
消息流转路径

Workflow 流式执行中的消息流转:

图片

说明

  • llmRef.realtimeWriter 是 Workflow 层创建的 StreamWriter,绑定在执行上下文。
  • 消息通过 Pipe 的 channel buffer 传递,实现异步非阻塞传输。
  • 应用层通过 StreamReader.Recv() 循环接收消息,转换为 SSE 事件格式。

完整执行流程概览

下面用一个 Workflow 示例,说明从 HTTP API 请求到 SSE 响应的端到端流程。

执行场景

图片

image-20251129195047360

完整流程

图片

步骤

  1. HTTP API 入口:前端发起 POST /v1/workflows/chat 请求。
  2. 创建 SSE 连接:Handler 层创建 SSE Writer,设置响应头为 text/event-stream
  3. 启动 Workflow:应用层解析参数,创建会话和轮次,调用领域服务执行 Workflow。
  4. 创建消息通道:Workflow 引擎创建 StreamWriter/StreamReader Pipe,绑定到执行上下文。
  5. LLM 节点构建:解析插件配置,构建工具列表和回调处理器。
  6. LLM 节点执行:从执行上下文获取 StreamWriter,设置到 llmRef.realtimeWriter
  7. 流式输出:LLM 逐字输出时,回调处理器通过 llmRef.realtimeWriter.Send() 发送消息。
  8. Function Calling:LLM 生成工具调用时,执行插件工具,回调处理器发送工具响应。
  9. 消息流转:消息通过 Pipe 传递到应用层,转换为 SSE 事件格式。
  10. 推送给前端:Handler 层循环接收消息,通过 SSE Writer 推送给前端。
HTTP Handler 接收请求

代码位置:backend/api/handler/coze/workflow_service.go:1091

// @router /v1/workflows/chat [POST]
func OpenAPIChatFlowRun(ctx context.Context, c *app.RequestContext) {
    // 1. 验证请求参数
    var req workflow.ChatFlowRunRequest
    err = c.BindAndValidate(&req)

    // 2. 创建 SSE Writer,设置响应头
    w := sse.NewWriter(c)
    c.SetContentType("text/event-stream; charset=utf-8")
    c.Response.Header.Set("Cache-Control", "no-cache")
    c.Response.Header.Set("Connection", "keep-alive")

    // 3. 调用应用服务启动 Workflow,返回 StreamReader
    sr, err := appworkflow.SVC.OpenAPIChatFlowRun(ctx, &req)

    // 4. 循环接收消息并发送 SSE 事件
    sendChatFlowStreamRunSSE(ctx, w, sr)
}
SSE 事件发送循环

代码位置:backend/api/handler/coze/workflow_service.go

func sendChatFlowStreamRunSSE(ctx context.Context, w *sse.Writer,
    sr *schema.StreamReader[[]*workflow.ChatFlowRunResponse]) {
    defer func() {
        _ = w.Close()
        sr.Close()
    }()

    seq := int64(1)
    for {
        // 从 StreamReader 接收消息(阻塞等待)
        respList, err := sr.Recv()

        if err != nil {
            if errors.Is(err, io.EOF) {
                break // 流结束
            }
            w.Write(&sse.Event{Type: "error", Data: []byte(err.Error())})
            return
        }

        // 将每个响应转换为 SSE 事件并发送
        for _, resp := range respList {
            event := &sse.Event{
                ID:   strconv.FormatInt(seq, 10),
                Type: resp.Event,
                Data: []byte(resp.Data),
            }
            w.Write(event)
            seq++
        }
    }
}

说明

  • sr.Recv() 阻塞等待,直到有消息写入 StreamWriter。
  • 消息通过 Pipe 的 channel buffer 异步传递,实现实时流式输出。
  • 每个消息转换为 SSE 事件格式(event: type\ndata: json\n\n)。
应用层启动 Workflow

代码位置:backend/application/workflow/chatflow.go:473

应用层负责:

  1. 解析请求参数,创建会话和轮次。
  2. 调用领域服务执行 Workflow。
  3. StreamReader[*entity.Message] 转换为 StreamReader[[]*ChatFlowRunResponse]
func (w *ApplicationService) OpenAPIChatFlowRun(ctx context.Context, req *workflow.ChatFlowRunRequest) (
    _ *schema.StreamReader[[]*workflow.ChatFlowRunResponse], err error) {

    // 1. 解析业务参数
    workflowID := mustParseInt64(req.GetWorkflowID())
    // ... 解析其他参数 ...

    // 2. 创建会话和轮次
    conversationID, sectionID, err := w.getOrCreateConversation(ctx, ...)
    roundID, err := crossagentrun.DefaultSVC().Create(ctx, ...)

    // 3. 构建执行配置
    exeCfg := workflowModel.ExecuteConfig{
        ID:            workflowID,
        SyncPattern:   workflowModel.SyncPatternStream,
        ConversationID: ptr.Of(conversationID),
        RoundID:        ptr.Of(roundID),
        // ...
    }

    // 4. 调用领域服务执行 Workflow,返回 StreamReader
    sr, err := GetWorkflowDomainSVC().StreamExecute(ctx, exeCfg, parameters)

    // 5. 将 entity.Message 转换为 ChatFlowRunResponse
    return schema.StreamReaderWithConvert(sr, w.convertToChatFlowRunResponseList(ctx, ...)), nil
}

说明

  • StreamExecute 返回 StreamReader[*entity.Message],需要转换为 SSE 格式。
  • StreamReaderWithConvert 在每次 Recv() 时调用转换函数。
  • 转换函数处理不同类型的消息(状态消息、数据消息等),生成对应的 SSE 事件。
消息转换函数

代码位置:backend/application/workflow/chatflow.go:937

转换函数将 entity.Message 转换为 SSE 事件格式:

func (w *ApplicationService) convertToChatFlowRunResponseList(ctx context.Context, info convertToChatFlowInfo)
    func(msg *entity.Message) (responses []*workflow.ChatFlowRunResponse, err error) {

    return func(msg *entity.Message) (responses []*workflow.ChatFlowRunResponse, err error) {
        // 处理状态消息(Workflow 开始/完成/失败)
        if msg.StateMessage != nil {
            switch msg.StateMessage.Status {
            case entity.WorkflowSuccess:
                return []*workflow.ChatFlowRunResponse{
                    {Event: string(vo.ChatFlowCompleted), Data: ...},
                    {Event: string(vo.ChatFlowDone), Data: ...},
                }, nil
            case entity.WorkflowFailed:
                return []*workflow.ChatFlowRunResponse{
                    {Event: string(vo.ChatFlowError), Data: ...},
                }, nil
            case entity.WorkflowRunning:
                return []*workflow.ChatFlowRunResponse{
                    {Event: string(vo.ChatFlowCreated), Data: ...},
                    {Event: string(vo.ChatFlowInProgress), Data: ...},
                }, nil
            }
        }

        // 处理数据消息(LLM 增量输出)
        if msg.DataMessage != nil && msg.Type == entity.Answer {
            deltaData, _ := sonic.MarshalString(&vo.MessageDetail{
                Content: msg.Content,  // 增量内容(如 “被”)
                // ...
            })

            if !msg.Last {
                // 增量输出:返回 delta 事件
                return []*workflow.ChatFlowRunResponse{
                    {Event: string(vo.ChatFlowMessageDelta), Data: deltaData},
                }, nil
            } else {
                // 完成输出:返回 delta + completed 事件
                return []*workflow.ChatFlowRunResponse{
                    {Event: string(vo.ChatFlowMessageDelta), Data: deltaData},
                    {Event: string(vo.ChatFlowMessageCompleted), Data: completeData},
                }, nil
            }
        }

        return nil, schema.ErrNoValue  // 跳过该消息
    }
}

说明

  • schema.ErrNoValue 表示跳过该消息,不会发送给前端。
  • 增量消息(msg.Last == false)只发送 delta 事件。
  • 完成消息(msg.Last == true)发送 delta + completed 事件。
Workflow 引擎创建 StreamWriter

代码位置:backend/domain/workflow/service/executable_impl.go:454

创建 StreamWriter/StreamReader Pipe,并将 StreamWriter 绑定到执行上下文:

func (i *impl) StreamExecute(ctx context.Context, config workflowModel.ExecuteConfig, input map[string]any)
    (*schema.StreamReader[*entity.Message], error) {

    // 1. 获取 Workflow 实体并转换为 WorkflowSchema
    wfEntity, err := i.Get(ctx, &vo.GetPolicy{...})
    workflowSC, err := adaptor.CanvasToWorkflowSchema(ctx, c)

    // 2. 创建 Workflow 对象(编译节点为可执行的 Runner)
    wf, err := compose.NewWorkflow(ctx, workflowSC, wfOpts...)

    // 3. 创建 StreamWriter 和 StreamReader(Pipe)
    sr, sw := schema.Pipe[*entity.Message](10)  // 缓冲区大小为 10

    // 4. 准备执行上下文(传入 StreamWriter)
    cancelCtx, executeID, opts, _, err := compose.NewWorkflowRunner(
        wfEntity.GetBasic(), workflowSC, config,
        compose.WithStreamWriter(sw),  // StreamWriter 传入 WorkflowRunner
    ).Prepare(ctx)

    // 5. 异步执行 Workflow(StreamWriter 会通过回调实时发送消息)
    wf.AsyncRun(cancelCtx, input, opts...)

    // 6. 返回 StreamReader(应用层会从这个 Reader 接收消息)
    return sr, nil
}

说明

  • schema.Pipe[*entity.Message](10) 创建一对 StreamWriter/StreamReader,通过 channel buffer 连接。
  • compose.WithStreamWriter(sw) 将 StreamWriter 传入 WorkflowRunner,最终绑定到执行上下文。
  • wf.AsyncRun() 异步执行,不会阻塞;消息通过 StreamWriter 实时发送。
  • 返回的 sr 是 StreamReader,应用层通过 sr.Recv() 接收消息。
StreamWriter 绑定到执行上下文

代码位置:backend/domain/workflow/internal/compose/workflow_run.go:107

func (r *WorkflowRunner) Prepare(ctx context.Context) (...) {
    // 1. 生成执行 ID
    executeID, err := repo.GenID(ctx)

    // 2. 构建执行选项(包含回调处理器,传入 StreamWriter)
    composeOpts, err := r.designateOptions(ctx)
    // ...
}

代码位置:backend/domain/workflow/internal/compose/designate_option.go:40

func (r *WorkflowRunner) designateOptions(ctx context.Context) ([]einoCompose.Option, error) {
    streamWriter := r.sw  // 从 WorkflowRunner 获取 StreamWriter

    // 创建根回调处理器(传入 StreamWriter)
    rootHandler := execute.NewRootWorkflowHandler(
        // ...
        streamWriter,  // StreamWriter 传入回调处理器
    )

    opts := []einoCompose.Option{einoCompose.WithCallbacks(rootHandler)}

    // 为每个节点添加回调处理器
    for key := range workflowSC.GetAllNodes() {
        ns := workflowSC.GetAllNodes()[key]
        if ns.Type == entity.NodeTypeLLM {
            // 为 LLM 节点添加工具回调选项
            llmNodeOpts, err := llmToolCallbackOptions(ctx, ns, eventChan, container)
            opts = append(opts, llmNodeOpts...)
        }
    }

    return opts, nil
}

代码位置:backend/domain/workflow/internal/execute/callback.go:76

func NewRootWorkflowHandler(..., streamWriter *schema.StreamWriter[*entity.Message]) callbacks.Handler {
    return &WorkflowHandler{
        // ...
        streamWriter: streamWriter,  // 保存 StreamWriter
    }
}

说明

  • StreamWriter 通过 designateOptions 传入根回调处理器(WorkflowHandler)。
  • WorkflowHandler 将 StreamWriter 保存到执行上下文(exeCtx.RootCtx.StreamWriter)。
  • LLM 节点在执行时可以从执行上下文获取 StreamWriter。
LLM 节点构建(准备工具列表)

LLM 节点在构建时(Build方法)准备工具列表和回调处理器,这是 Function Calling 的基础。

代码位置:backend/domain/workflow/internal/nodes/llm/llm.go:385

func (c *Config) Build(ctx context.Context, ns *schema2.NodeSchema, _ ...schema2.BuildOption) (any, error) {
    var (
        tools                 []tool.BaseTool
        toolCallbackHandler   callbacks.Handler
        llmRef                *LLM  // 引用(用于回调访问 realtimeWriter)
    )

    // 1. 构建 ChatModel
    chatModel, info, err := modelbuilder.BuildModelByID(ctx, c.LLMParams.ModelType, ...)

    // 2. 处理 Function Calling 参数(构建插件工具列表)
    fcParams := c.FCParam
    if fcParams != nil && fcParams.PluginFCParam != nil {
        // 2.1 构建插件工具请求
        pluginToolsInvokableReq := make(map[int64]*wrapPlugin.ToolsInvokableRequest)
        for _, p := range fcParams.PluginFCParam.PluginList {
            pid, _ := strconv.ParseInt(p.PluginID, 10, 64)
            toolID, _ := strconv.ParseInt(p.ApiId, 10, 64)
            // ... 构建请求 ...
        }

        // 2.2 获取插件工具列表(转换为 InvokableTool)
        inInvokableTools := make([]tool.BaseTool, 0)
        for _, req := range pluginToolsInvokableReq {
            toolMap, err := wrapPlugin.GetPluginInvokableTools(ctx, req)
            for _, t := range toolMap {
                inInvokableTools = append(inInvokableTools, newInvokableTool(t))
            }
        }
    // 执行 funcation calling 时 eino 框架会调用 tool.InvokableRun 函数
        tools = append(tools, inInvokableTools...)
    }

    // 3. 构建 LLM Graph
    g := compose.NewGraph[map[string]any, map[string]any](...)

    // 4. 根据是否有工具选择不同的节点类型
    if len(tools) > 0 {
        // 4.1 有工具:创建 React Agent(ReAct 模式)
        m, ok := modelWithInfo.(model.ToolCallingChatModel)
        reactConfig := react.AgentConfig{
            ToolCallingModel: m,
            ToolsConfig:      compose.ToolsNodeConfig{Tools: tools},  // 传入工具列表
            MaxStep:          100,
        }
        reactAgent, err := react.NewAgent(ctx, &reactConfig)
        agentNode, opts := reactAgent.ExportGraph()
        _ = g.AddGraphNode(llmNodeKey, agentNode, opts...)
    } else {
        // 4.2 无工具:直接添加 ChatModel 节点
        _ = g.AddChatModelNode(llmNodeKey, modelWithInfo)
    }

    // 5. 创建工具回调处理器(用于实时流式输出)
    if len(tools) >= 0 {
        toolCallbackHandler = callbacks2.NewHandlerHelper().
            ChatModel(&callbacks2.ModelCallbackHandler{
                OnEndWithStreamOutput: func(ctx context.Context, info *callbacks.RunInfo,
                    output *schema.StreamReader[*model.CallbackOutput]) context.Context {
                    // 流式输出处理,发送实时消息
                    go func() {
                        for {
                            frame, err := output.Recv()
                            if errors.Is(err, io.EOF) {
                                break
                            }
                            if frame.Message.Content != "" {
                                //  实时发送消息到 StreamWriter
                                if llmRef != nil && llmRef.realtimeWriter != nil {
                                    dataMsg := &entity.DataMessage{
                                        Type:      entity.Answer,
                                        Content:   frame.Message.Content,  // 增量内容(如 “被”)
                                        // ...
                                    }
                                    llmRef.realtimeWriter.Send(&entity.Message{DataMessage: dataMsg}, nil)
                                }
                            }
                        }
                    }()
                    return ctx
                },
            }).
            Tool(&callbacks2.ToolCallbackHandler{
                OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *tool.CallbackInput) context.Context {
                    //  发送工具调用开始事件
                    if llmRef != nil && llmRef.realtimeWriter != nil {
                        // ... 构建 FunctionCall 消息 ...
                        llmRef.realtimeWriter.Send(&entity.Message{DataMessage: dataMsg}, nil)
                    }
                    return ctx
                },
                OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *tool.CallbackOutput) context.Context {
                    //  发送工具调用结束事件
                    if llmRef != nil && llmRef.realtimeWriter != nil {
                        // ... 构建 ToolResponse 消息 ...
                        llmRef.realtimeWriter.Send(&entity.Message{DataMessage: dataMsg}, nil)
                    }
                    return ctx
                },
            }).Handler()
    }

    // 6. 编译 Graph 为可执行的 Runner
    r, err := g.Compile(ctx, compileOpts...)

    // 7. 创建 LLM 节点实例
    llm := &LLM{
        r:                   r,
        toolCallbackHandler: toolCallbackHandler,  // 保存回调处理器
    }

    // 8. 设置引用,使回调可以访问 realtimeWriter
    llmRef = llm

    return llm, nil
}

说明

  • 工具列表构建:从 FCParam.PluginFCParam.PluginList 解析插件配置,调用 wrapPlugin.GetPluginInvokableTools() 获取工具。
  • React Agent:如果有工具,创建 React Agent(ReAct 模式),否则使用普通 ChatModel。
  • 回调处理器:在构建时创建 toolCallbackHandler,包含 ChatModel.OnEndWithStreamOutputTool.OnStart/OnEnd 回调。
  • llmRef 引用:通过 llmRef 引用,回调处理器可以访问 llmRef.realtimeWriter(在执行时设置)。
LLM 节点执行(获取 StreamWriter)

代码位置:backend/domain/workflow/internal/nodes/llm/llm.go:1455

LLM 节点在执行时(Stream方法)从执行上下文获取 StreamWriter,并设置到 llmRef.realtimeWriter

func (l *LLM) Stream(ctx context.Context, in map[string]any, opts ...nodes.NodeOption)
    (out *schema.StreamReader[map[string]any], err error) {

    // 1. 准备执行选项
    composeOpts, resumingEvent, err := l.prepare(ctx, in, opts...)

    // 2. 从执行上下文获取 StreamWriter
    exeCtx := execute.GetExeCtx(ctx)
    if exeCtx != nil && exeCtx.RootCtx.StreamWriter != nil {
        l.realtimeWriter = exeCtx.RootCtx.StreamWriter  // 设置到 LLM 实例
        logs.Infof("✅ [LLM Stream] Found StreamWriter in execute context")
    }

    // 3. 添加工具回调处理器(回调会使用 realtimeWriter 发送消息)
    if l.toolCallbackHandler != nil {
        composeOpts = append(composeOpts, compose.WithCallbacks(l.toolCallbackHandler))
    }

    // 4. 调用 LLM Graph 的 Stream 方法(如果 LLM 生成 Function Call,会自动调用工具)
    out, err = l.r.Stream(ctx, in, composeOpts...)

    return out, nil
}

说明

  • execute.GetExeCtx(ctx) 获取执行上下文,其中包含 RootCtx.StreamWriter
  • l.realtimeWriter = exeCtx.RootCtx.StreamWriter 将 StreamWriter 设置到 LLM 实例。
  • 回调处理器(在 Build 时创建)通过 llmRef.realtimeWriter 访问 StreamWriter。
  • 当 LLM 流式输出或工具调用时,回调处理器会调用 llmRef.realtimeWriter.Send() 发送消息。
消息发送时机

代码位置:backend/domain/workflow/internal/nodes/llm/llm.go:877

回调处理器在以下时机调用 llmRef.realtimeWriter.Send()

1. LLM 流式输出时ChatModel.OnEndWithStreamOutput):

ChatModel(&callbacks2.ModelCallbackHandler{
    OnEndWithStreamOutput: func(ctx context.Context, info *callbacks.RunInfo,
        output *schema.StreamReader[*model.CallbackOutput]) context.Context {

        go func() {
            for {
                frame, err := output.Recv()  // 从 LLM 流式输出接收
                if errors.Is(err, io.EOF) {
                    break
                }

                if frame.Message.Content != "" {
                    //  实时发送消息到 StreamWriter
                    if llmRef != nil && llmRef.realtimeWriter != nil {
                        dataMsg := &entity.DataMessage{
                            Type:      entity.Answer,
                            Content:   frame.Message.Content,  // 增量内容(如 “被”)
                            NodeType:  entity.NodeTypeLLM,
                            ExecuteID: exeCtx.RootExecuteID,
                            NodeID:    string(exeCtx.NodeKey),
                        }
                        // 发送到 StreamWriter(写入 channel buffer)
                        llmRef.realtimeWriter.Send(&entity.Message{DataMessage: dataMsg}, nil)
                    }
                }
            }
        }()
        return ctx
    },
})

2. 工具调用开始时Tool.OnStart):

Tool(&callbacks2.ToolCallbackHandler{
    OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *tool.CallbackInput) context.Context {
        //  发送工具调用开始事件
        if llmRef != nil && llmRef.realtimeWriter != nil {
            dataMsg := &entity.DataMessage{
                Type:      entity.FunctionCall,
                FunctionCall: &entity.FunctionCallInfo{
                    Name:      info.Name,
                    CallID:    compose.GetToolCallID(ctx),
                    Arguments: parseArguments(input.ArgumentsInJSON),
                },
            }
            // 发送到 StreamWriter
            llmRef.realtimeWriter.Send(&entity.Message{DataMessage: dataMsg}, nil)
        }
        return ctx
    },
})

3. 工具调用结束时Tool.OnEnd):

OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *tool.CallbackOutput) context.Context {
    //  发送工具调用结束事件
    if llmRef != nil && llmRef.realtimeWriter != nil {
        dataMsg := &entity.DataMessage{
            Type:      entity.ToolResponse,
            ToolResponse: &entity.ToolResponseInfo{
                Name:     info.Name,
                CallID:   compose.GetToolCallID(ctx),
                Response: output.OutputInJSON,
            },
        }
        // 发送到 StreamWriter
        llmRef.realtimeWriter.Send(&entity.Message{DataMessage: dataMsg}, nil)
    }
    return ctx
},

说明

  • llmRef.realtimeWriter.Send() 将消息写入 Pipe 的 channel buffer。
  • 消息会立即被 StreamReader 接收(如果应用层正在 Recv())。
  • 回调处理器在独立的 goroutine 中运行,不会阻塞 LLM 执行。
消息接收与 SSE 发送
1. 消息接收路径

完整路径:

图片

代码位置:backend/application/workflow/chatflow.go:918

// 应用层调用领域服务执行 Workflow(返回 StreamReader)
sr, err := GetWorkflowDomainSVC().StreamExecute(ctx, exeCfg, parameters)
// 将 Workflow 消息 StreamReader 转换为 SSE 响应 StreamReader
return schema.StreamReaderWithConvert(sr, w.convertToChatFlowRunResponseList(ctx, ...)), nil

代码位置:backend/api/handler/coze/workflow_service.go:724

func sendChatFlowStreamRunSSE(ctx context.Context, w *sse.Writer,
    sr *schema.StreamReader[[]*workflow.ChatFlowRunResponse]) {
    for {
        // 从 StreamReader 接收消息(阻塞等待)
        respList, err := sr.Recv()

        if errors.Is(err, io.EOF) {
            break // 流结束
        }

        // 将每个响应转换为 SSE 事件并发送给前端
        for _, resp := range respList {
            event := &sse.Event{
                ID:   strconv.FormatInt(seq, 10),
                Type: resp.Event,
                Data: []byte(resp.Data),
            }
            w.Write(event)
            seq++
        }
    }
}

说明

  • sr.Recv() 阻塞等待,直到有消息写入 StreamWriter。
  • StreamReaderWithConvert 在每次 Recv() 时调用转换函数。
  • 转换函数将 entity.Message 转换为 ChatFlowRunResponse,包含事件类型和 JSON 数据。
  • SSE Writer 将事件写入 HTTP 响应流,前端通过 EventSource API 接收。
2. 消息流转时序图

图片

插件工具执行(Function Calling)

当 LLM 生成 Function Call 时,eino 框架自动调用工具的 InvokableRun 方法。

1. 工具包装器

代码位置:backend/domain/workflow/internal/nodes/llm/plugin.go:32

func newInvokableTool(pl crossplugin.InvokableTool) tool.InvokableTool {
    return &pluginInvokableTool{
        pluginInvokableTool: pl,
    }
}
func (p pluginInvokableTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
    //  获取执行配置(从 Workflow 上下文)
    execCfg := execute.GetExecuteConfig(opts...)

    //  调用插件服务执行工具
    return p.pluginInvokableTool.PluginInvoke(ctx, argumentsInJSON, execCfg)
}
2. 插件工具执行

代码位置:backend/domain/workflow/plugin/plugin.go:338

func (p *pluginInvokeTool) PluginInvoke(ctx context.Context, argumentsInJSON string,
    cfg workflowModel.ExecuteConfig) (string, error) {

    // 1. 构建执行请求
    req := &model.ExecuteToolRequest{
        UserID:          conv.Int64ToStr(cfg.Operator),
        PluginID:        p.pluginEntity.PluginID,
        ToolID:          p.toolInfo.ID,
        ExecScene:       consts.ExecSceneOfWorkflow,
        ArgumentsInJson: argumentsInJSON,  // LLM 生成的参数 JSON
    }

    // 2. 执行工具(统一执行入口)
    r, err := crossplugin.DefaultSVC().ExecuteTool(ctx, req, execOpts...)

    // 3. 返回裁剪后的响应(JSON 字符串,供 LLM 继续推理)
    return r.TrimmedResp, nil
}

说明

  • argumentsInJSON 是 LLM 根据工具 Schema 生成的参数 JSON。
  • ExecuteTool 是统一执行入口,会根据插件类型选择 HTTP/MCP/Custom 执行器。
  • TrimmedResp 是根据响应 Schema 裁剪后的 JSON,只包含 LLM 需要的字段。
统一执行入口(PluginService.ExecuteTool)

所有插件执行都通过 PluginService.ExecuteTool() 统一入口,无论插件类型是 HTTP、MCP 还是 Custom。下面分析从统一入口到具体执行器的完整流程。

1. ExecuteTool 主流程
func (p *pluginServiceImpl) ExecuteTool(ctx context.Context, req *model.ExecuteToolRequest, opts ...model.ExecuteToolOpt) (resp *model.ExecuteToolResponse, err error) {
    // 1. 解析执行选项
    opt := &model.ExecuteToolOption{}
    for _, fn := range opts {
        fn(opt)
    }

    // 2. 构建工具执行器(根据执行场景获取插件和工具信息)
    executor, err := p.buildToolExecutor(ctx, req, opt)
    if err != nil {
        return nil, errorx.Wrapf(err, "buildToolExecutor failed")
    }

    // 3. 获取认证信息(如果需要 OAuth,会尝试获取 Access Token)
    authInfo := executor.plugin.GetAuthInfo()
    accessToken, authURL, err := p.acquireAccessTokenIfNeed(ctx, req, authInfo, executor.tool.Operation)
    if err != nil {
        return nil, errorx.Wrapf(err, "acquireAccessToken failed")
    }

    // 4. 执行工具(调用具体的 Invocation 实现)
    result, err := executor.execute(ctx, req.ArgumentsInJson, accessToken, authURL)
    if err != nil {
        return nil, errorx.Wrapf(err, "execute tool failed")
    }

    // 5. 工具调试场景下,更新工具的调试状态
    if req.ExecScene == consts.ExecSceneOfToolDebug {
        err = p.toolRepo.UpdateDraftTool(ctx, &entity.ToolInfo{
            ID:          req.ToolID,
            DebugStatus: ptr.Of(common.APIDebugStatus_DebugPassed),
        })
        if err != nil {
            logs.CtxErrorf(ctx, "UpdateDraftTool failed, tooID=%d, err=%v", req.ToolID, err)
        }
    }

    // 6. 自动生成响应 Schema(可选)
    var respSchema openapi3.Responses
    if opt.AutoGenRespSchema {
        respSchema, err = p.genToolResponseSchema(ctx, result.RawResp)
        if err != nil {
            return nil, errorx.Wrapf(err, "genToolResponseSchema failed")
        }
    }

    // 7. 构建并返回响应
    resp = &model.ExecuteToolResponse{
        Tool:        executor.tool,      // 工具信息
        Request:     result.Request,     // 请求字符串(用于日志)
        RawResp:     result.RawResp,     // 原始响应
        TrimmedResp: result.TrimmedResp, // 裁剪后的响应(根据 Schema)
        RespSchema:  respSchema,        // 响应 Schema
    }

    return resp, nil
}

说明

  • ExecuteTool 是统一执行入口,所有插件类型(HTTP/MCP/Custom)都通过此方法执行。
  • 执行流程:构建执行器 → 获取认证 → 执行工具 → 处理响应。
2. buildToolExecutor(构建工具执行器)
func (p *pluginServiceImpl) buildToolExecutor(ctx context.Context, req *model.ExecuteToolRequest, opt *model.ExecuteToolOption) (impl *toolExecutor, err error) {
    // 1. 验证用户 ID
    if req.UserID == "" {
        return nil, errorx.New(errno.ErrPluginExecuteToolFailed, errorx.KV(errno.PluginMsgKey, "userID is required"))
    }

    var (
        pl *entity.PluginInfo  // 插件信息
        tl *entity.ToolInfo    // 工具信息
    )

    // 2. 根据执行场景获取插件和工具信息
    switch req.ExecScene {
    case consts.ExecSceneOfOnlineAgent:
        // 在线 Agent:从版本表获取工具配置
        pl, tl, err = p.getOnlineAgentPluginInfo(ctx, req, opt)
    case consts.ExecSceneOfDraftAgent:
        // 草稿 Agent:从草稿表获取工具配置,并合并 Agent 的自定义配置
        pl, tl, err = p.getDraftAgentPluginInfo(ctx, req, opt)
    case consts.ExecSceneOfToolDebug:
        // 工具调试:从草稿表获取
        pl, tl, err = p.getToolDebugPluginInfo(ctx, req, opt)
    case consts.ExecSceneOfWorkflow:
        // Workflow:根据 ExecDraftTool 标志决定从草稿表还是在线表获取
        pl, tl, err = p.getWorkflowPluginInfo(ctx, req, opt)
    default:
        return nil, fmt.Errorf("invalid execute scene '%s'", req.ExecScene)
    }
    if err != nil {
        return nil, err
    }

    // 3. 构建工具执行器
    impl = &toolExecutor{
        execScene:                  req.ExecScene,                    // 执行场景
        userID:                     req.UserID,                      // 用户 ID
        conversationID:             opt.ConversationID,              // 会话 ID(用于 HTTP Header)
        plugin:                     pl,                               // 插件信息
        tool:                       tl,                               // 工具信息
        projectInfo:                opt.ProjectInfo,                 // 项目信息(用于变量引用)
        invalidRespProcessStrategy: opt.InvalidRespProcessStrategy,  // 响应处理策略
        oss:                        p.oss,                            // OSS 存储(用于文件 URI 转换)
    }

    // 4. 如果提供了自定义 Operation,使用自定义的(用于 Agent 自定义配置)
    if opt.Operation != nil {
        impl.tool.Operation = opt.Operation
    }

    return impl, nil
}

说明

  • 根据执行场景(在线 Agent/草稿 Agent/Workflow/工具调试)获取插件和工具信息。
  • 不同场景的数据来源不同(版本表/草稿表/SaaS)。
  • 构建 toolExecutor 结构,包含插件信息、工具信息、项目信息等。
3. getWorkflowPluginInfo 示例(Workflow 场景)
func (p *pluginServiceImpl) getWorkflowPluginInfo(ctx context.Context, req *model.ExecuteToolRequest,
    execOpt *model.ExecuteToolOption) (pl *entity.PluginInfo, tl *entity.ToolInfo, err error) {

    // 1. 检查是否来自 SaaS
    if req.PluginFrom != nil && *req.PluginFrom == bot_common.PluginFrom_FromSaas {
        // 从 SaaS 获取插件和工具信息
        tools, plugin, err := p.toolRepo.BatchGetSaasPluginToolsInfo(ctx, []int64{req.PluginID})
        // ... 查找对应的工具
        return pl, tl, nil
    }

    // 2. 检查是否执行草稿工具
    if req.ExecDraftTool {
        // 从草稿表获取
        pl, exist, err := p.pluginRepo.GetDraftPlugin(ctx, req.PluginID)
        tl, exist, err := p.toolRepo.GetDraftTool(ctx, req.ToolID)
        return pl, tl, nil
    }

    // 3. 从在线表获取(支持版本)
    if execOpt.ToolVersion == "" {
        // 获取最新版本
        pl, exist, err := p.pluginRepo.GetOnlinePlugin(ctx, req.PluginID)
        tl, exist, err := p.toolRepo.GetOnlineTool(ctx, req.ToolID)
    } else {
        // 获取指定版本
        pl, exist, err := p.pluginRepo.GetVersionPlugin(ctx, model.VersionPlugin{
            PluginID: req.PluginID,
            Version:  execOpt.ToolVersion,
        })
        tl, exist, err := p.toolRepo.GetVersionTool(ctx, model.VersionTool{
            ToolID:  req.ToolID,
            Version: execOpt.ToolVersion,
        })
    }

    return pl, tl, nil
}
4. toolExecutor.execute(执行工具)

代码位置:backend/domain/plugin/service/exec_tool.go:610

toolExecutor.execute()是实际执行工具的方法,根据插件类型调用对应的 Invocation 实现:

func (t *toolExecutor) execute(ctx context.Context, argumentsInJson, accessToken, authURL string) (resp *ExecuteResponse, err error) {
    // 1. 验证参数
    if argumentsInJson == "" {
        return nil, errorx.New(errno.ErrPluginExecuteToolFailed,
            errorx.KV(errno.PluginMsgKey, "argumentsInJson is required"))
    }

    // 2. 构建 InvocationArgs(参数分组、公共参数注入、默认值处理等)
    invocation, err := tool.NewInvocationArgs(ctx, &tool.InvocationArgsBuilder{
        ArgsInJson:     argumentsInJson,
        ProjectInfo:    t.projectInfo,
        UserID:         t.userID,
        Plugin:         t.plugin,
        Tool:           t.tool,
        PluginManifest: t.plugin.Manifest,
        ServerURL:      t.plugin.GetServerURL(),
        AuthInfo: &tool.AuthInfo{
            OAuth: &tool.OAuthInfo{
                AccessToken: accessToken,
                AuthURL:     authURL,
            },
            MetaInfo: t.plugin.GetAuthInfo(),
        },
    })
    if err != nil {
        return nil, err
    }

    // 3. 文件 URI 转换(非调试场景)
    if t.execScene != consts.ExecSceneOfToolDebug {
        // 将文件 URI 转换为可访问的 URL
        err = invocation.AssembleFileURIToURL(ctx, t.oss)
        if err != nil {
            return nil, err
        }
    }

    // 4. 根据插件来源选择执行器
    var requestStr, rawResp string
    if t.plugin.Source != nil && *t.plugin.Source == bot_common.PluginFrom_FromSaas {
        // SaaS 插件:使用 SaaS 调用实现
        requestStr, rawResp, err = tool.NewSaasCallImpl().Do(ctx, invocation)
    } else {
        // 普通插件:根据插件类型选择执行器(HTTP/MCP/Custom)
        requestStr, rawResp, err = newToolInvocation(t).Do(ctx, invocation)
    }

    if err != nil {
        return nil, err
    }

    // 5. 处理空响应
    const defaultResp = "{}"
    if rawResp == "" {
        return &ExecuteResponse{
            Request:     requestStr,
            TrimmedResp: defaultResp,
            RawResp:     defaultResp,
        }, nil
    }

    // 6. 处理响应(根据 Schema 裁剪,只返回 LLM 需要的字段)
    trimmedResp, err := t.processResponse(ctx, rawResp)
    if err != nil {
        return nil, err
    }
    if trimmedResp == "" {
        trimmedResp = defaultResp
    }

    return &ExecuteResponse{
        Request:     requestStr,     // 请求字符串(用于日志)
        TrimmedResp: trimmedResp,    // 裁剪后的响应(根据 Schema)
        RawResp:     rawResp,        // 原始响应
    }, nil
}

说明

  • InvocationArgs 构建:包含参数分组、公共参数注入、默认值处理、变量引用等。
  • 文件 URI 转换:非调试场景下,将文件 URI 转换为可访问的 OSS URL。
  • 执行器路由:SaaS 插件使用 SaasCallImpl,普通插件根据类型使用 newToolInvocation() 路由。
  • 响应处理:根据响应 Schema 裁剪,只返回 LLM 需要的字段。
5. newToolInvocation(根据插件类型选择执行器)

代码位置:backend/domain/plugin/service/exec_tool.go:597

路由逻辑,根据插件类型实例化对应的执行器:

func newToolInvocation(t *toolExecutor) tool.Invocation {
    switch t.plugin.Manifest.API.Type {
    case consts.PluginTypeOfCloud:
        // HTTP 插件:使用 HTTP 调用实现
        return tool.NewHttpCallImpl(t.conversationID)
    case consts.PluginTypeOfMCP:
        // MCP 插件:使用 MCP 调用实现(官方未实现,我们已实现)
        return tool.NewMcpCallImpl()
    case consts.PluginTypeOfCustom:
        // 自定义插件:使用自定义调用实现
        return tool.NewCustomCallImpl()
    default:
        // 默认使用 HTTP 调用
        return tool.NewHttpCallImpl(t.conversationID)
    }
}

说明

  • 路由机制:根据 plugin.Manifest.API.Type 选择对应的执行器。
  • 三种插件类型:HTTP(OpenAPI)、MCP、Custom。
  • 扩展性:新增插件类型只需实现 tool.Invocation 接口,并在此处添加路由。

这里会实例化最开始介绍的插件类型(HTTP、MCP、自定义)。官方 MCP 还未实现,我们已实现,将在下一篇介绍。

到这里,插件的核心流程已介绍完毕。下一小节我们简要介绍一下 HTTP 插件的实现,便于后续实现自定义插件。

HTTP 插件执行流程

HTTP 执行器主流程

image-20251206195204811

自定义、MCP、HTTP 都是实现的 Invocation 接口,如需实现其它插件,可参考其中一个实现即可,这里以 HTTP 来进行举例。

HTTP 插件通过 tool.NewHttpCallImpl() 执行:

图片

代码位置:backend/domain/plugin/service/tool/invocation_http.go

HTTP 执行器 Do 方法实现
func (h *httpCallImpl) Do(ctx context.Context, args *InvocationArgs) (request string, resp string, err error) {
    // 1. 构建 HTTP 请求(URL、Header、Body)
    httpReq, err := h.buildHTTPRequest(ctx, args)
    if err != nil {
        return "", "", err
    }

    // 2. 注入认证信息(OAuth、Service Token 等)
    errMsg, err := h.injectAuthInfo(ctx, httpReq, args)
    if err != nil {
        return "", "", err
    }

    // 3. 如果返回错误消息,说明需要 OAuth 授权
    if errMsg != "" {
        // 创建中断事件,通知调用方需要用户授权
        event := &model.ToolInterruptEvent{
            Event: pluginConsts.InterruptEventTypeOfToolNeedOAuth,
            ToolNeedOAuth: &model.ToolNeedOAuthInterruptEvent{
                Message: errMsg,  // 包含授权 URL 的错误消息
            },
        }
        // 返回中断错误,Workflow 引擎会暂停执行并等待授权
        return "", "", compose.NewInterruptAndRerunErr(event)
    }

    // 4. 读取请求体(用于序列化)
    var reqBodyBytes []byte
    if httpReq.GetBody != nil {
        reqBody, err := httpReq.GetBody()
        if err != nil {
            return "", "", err
        }
        defer reqBody.Close()
        reqBodyBytes, err = io.ReadAll(reqBody)
        if err != nil {
            return "", "", err
        }
    }

    // 5. 序列化请求(用于日志和返回给调用方)
    requestStr, err := genRequestString(httpReq, reqBodyBytes)
    if err != nil {
        return "", "", err
    }

    // 6. 使用 resty 发送 HTTP 请求
    restyReq := defaultHttpCli.NewRequest()
    restyReq.Header = httpReq.Header
    restyReq.Method = httpReq.Method
    restyReq.URL = httpReq.URL.String()
    if reqBodyBytes != nil {
        restyReq.SetBody(reqBodyBytes)
    }
    restyReq.SetContext(ctx)

    logs.CtxDebugf(ctx, "[execute] url=%s, header=%s, method=%s, body=%s",
        restyReq.URL, restyReq.Header, restyReq.Method, restyReq.Body)

    httpResp, err := restyReq.Send()
    if err != nil {
        return "", "", errorx.New(errno.ErrPluginExecuteToolFailed,
            errorx.KVf(errno.PluginMsgKey, "http request failed, err=%s", err))
    }

    logs.CtxDebugf(ctx, "[execute] status=%s, response=%s", httpResp.Status(), httpResp.String())

    // 7. 检查 HTTP 状态码
    if httpResp.StatusCode() != http.StatusOK {
        return "", "", errorx.New(errno.ErrPluginExecuteToolFailed,
            errorx.KVf(errno.PluginMsgKey, "http request failed, status=%s\nresp=%s",
                httpResp.Status(), httpResp.String()))
    }

    // 8. 返回请求字符串和响应字符串
    return requestStr, httpResp.String(), nil
}

HTTP 插件核心流程都在上面的 Do 函数中,感兴趣的可以直接看 coze-studio/backend/domain/plugin/service/tool/invocation_http.go 源码。

总结

本文详细分析了 coze-studio 后端插件的加载流程和执行流程,从 HTTP API 请求到 SSE 响应的完整链路,为后续二次开发奠定了基础。

下一篇我们会详细介绍如何基于 coze-studio 来扩展支持 MCP 协议,让 coze-studio 的功能更加完善。




上一篇:Next.js RCE漏洞深度解析:从SSR原理到供应链风险与防御实战
下一篇:Wails桌面开发工具使用详解:核心命令解析与实战演示
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-8 23:24 , Processed in 1.056100 second(s), 43 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表