找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

194

积分

0

好友

24

主题
发表于 昨天 03:47 | 查看: 2| 回复: 0

在数据分析场景中,Excel 文件是最常见的数据载体之一。但传统的 Excel 数据分析往往需要用户具备一定的技术能力,比如熟悉公式、透视表或者 SQL 查询。有没有一种可能,让用户用自然语言直接提问,系统就能自动完成数据分析并给出可视化结果?

我们来聊聊一个基于 LangGraph + DuckDB + ReActAgent 架构的 Excel 问答系统,它能帮你实现这个目标:

  • 支持多文件、多 Sheet 的统一分析
  • 自动将 Excel 数据映射为数据库表结构
  • 根据自然语言问题生成 SQL 查询
  • 智能推荐可视化图表类型
  • 实时流式返回思考过程和分析结果

一、系统架构设计

1.1 整体架构

整个系统采用 LangGraph 作为编排引擎,将 Excel 问答流程拆解为多个节点,每个节点负责特定的任务。

用户问题 → Excel解析 → SQL生成 → SQL执行 → 结果总结 → 数据渲染
节点名称 功能描述 输入 输出
excel_parsing 解析 Excel 文件,注册到 DuckDB 文件列表 表结构信息
sql_generator 根据问题生成 SQL 查询 用户问题 + 表结构 SQL 语句 + 图表类型
sql_executor 执行 SQL 查询 SQL 语句 查询结果
summarize 生成自然语言总结 查询结果 分析报告
data_render 渲染可视化图表 查询结果 + 图表类型 图表数据
1.2 技术栈选型
技术组件 作用 选型理由
LangGraph 工作流编排 支持复杂的状态管理和条件分支,便于构建 Agent 流程
DuckDB 内存数据库 轻量级、支持 SQL、无需额外部署,天然支持 Pandas 集成
LangChain LLM 调用封装 统一的 LLM 接口,支持多种模型切换
Pandas 数据处理 强大的 Excel 读取和数据清洗能力
SQLGlot SQL 解析 用于解析 SQL 语句,提取表名、列名等信息
Langfuse 链路追踪 记录每次对话的完整执行链路,便于调试和优化
1.3 核心设计理念

会话隔离:每个 chat_id 拥有独立的 DuckDB 实例,避免多用户数据混淆。 流式响应:采用 SSE(Server-Sent Events)实时推送思考过程,提升用户体验。 智能推荐:不仅生成 SQL,还根据查询逻辑推荐最合适的图表类型(折线图、柱状图、表格等)。

二、核心模块详解

2.1 状态管理(ExcelAgentState)

LangGraph 的核心是状态流转。我们定义了 ExcelAgentState 来管理整个问答流程的状态:

class ExcelAgentState(TypedDict):
    user_query: str                              # 用户问题
    file_list: list                              # 文件列表
    chat_id: Optional[str]                       # 聊天ID(会话隔离)
    file_metadata: Dict[str, FileInfo]           # 文件元数据
    sheet_metadata: Dict[str, SheetInfo]         # Sheet元数据
    db_info: list[dict]                          # 表结构信息
    catalog_info: Dict[str, str]                 # Catalog映射
    generated_sql: Optional[str]                 # 生成的SQL
    chart_url: Optional[str]                     # 图表URL
    chart_type: Optional[str]                    # 图表类型
    apache_chart_data: Optional[Dict[str, Any]]  # ECharts数据
    execution_result: Optional[ExecutionResult]  # SQL执行结果
    report_summary: Optional[str]                # 分析报告

设计亮点

  • 使用 TypedDict 确保类型安全
  • 区分文件级元数据(file_metadata)和表级元数据(sheet_metadata)
  • 支持多 Catalog 管理,每个文件对应一个独立的 Catalog
2.2 DuckDB 连接管理
2.2.1 为什么选择 DuckDB?
对比项 DuckDB SQLite PostgreSQL
部署复杂度 嵌入式,无需部署 嵌入式 需要独立部署
分析性能 列式存储,OLAP优化 行式存储 行式存储
Pandas集成 原生支持 需要转换 需要转换
内存占用
SQL兼容性 PostgreSQL语法 标准SQL PostgreSQL
2.2.2 会话级连接管理

为了支持多用户并发,我们设计了两层管理器:

# 第一层:单个 Excel 文件的 DuckDB 管理器
class ExcelDuckDBManager:
    def __init__(self):
        self._connection = None                    # 延迟初始化
        self._registered_catalogs = {}             # Catalog 注册表
        self._registered_tables = {}               # 表注册表

    def register_excel_file(self, file_path, file_name):
        """注册 Excel 文件到 DuckDB"""
        # 1. 读取所有 Sheet
        # 2. 清理列名(去除特殊字符)
        # 3. 创建 Catalog 和 Table
        # 4. 返回表结构信息

# 第二层:聊天级别的管理器(会话隔离)
class ChatDuckDBManager:
    def __init__(self):
        self._chat_managers = {}  # {chat_id: ExcelDuckDBManager}
        self._session_timeout = 36000 # 10小时超时

    def get_manager(self, chat_id):
        """为每个 chat_id 创建独立的管理器"""
        if chat_id not in self._chat_managers:
            self._chat_managers[chat_id] = ExcelDuckDBManager()
        return self._chat_managers[chat_id]

核心优势

  • 会话隔离:不同用户的数据互不干扰
  • 资源复用:同一会话内多次查询共享连接
  • 自动清理:超时会话自动释放资源
2.2.3 数据注册流程
def register_excel_file(self, file_path: str, file_name: str):
    # 1. 生成唯一的 Catalog 名称
    catalog_name = self._get_unique_catalog_name(file_name)

    # 2. 读取 Excel 所有 Sheet
    excel_file_data = pd.ExcelFile(file_path)
    dataframes = [(sheet, pd.read_excel(file_path, sheet_name=sheet))
                   for sheet in excel_file_data.sheet_names]

    # 3. 清理列名(去除特殊字符、中文支持)
    for sheet_name, df in dataframes:
        df.columns = [self._sanitize_column_name(col) for col in df.columns]

    # 4. 注册到 DuckDB
    conn.execute(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}")
    conn.execute(f'CREATE TABLE "{catalog_name}"."{table_name}" AS SELECT * FROM df')

    # 5. 提取表结构信息
    columns_info = {col: {"comment": col, "type": self._map_pandas_dtype_to_sql(df[col].dtype)}
                    for col in df.columns}

    return catalog_name, registered_tables

关键技术点

  • 列名清理:支持中文列名,自动处理特殊字符
  • 类型映射:Pandas dtype → SQL type(如 int64BIGINT
  • 样本数据:保存前 5 行数据用于 SQL 生成时的上下文参考
2.3 SQL 生成节点

这是整个系统的核心,需要将自然语言转换为精确的 SQL 查询。

2.3.1 Prompt 设计
prompt = ChatPromptTemplate.from_template("""
你是一位专业的数据库管理员(DBA),任务是根据提供的数据库结构、表关系以及用户需求,生成优化的DUCKDB SQL查询语句,并推荐合适的可视化图表。

## 任务
  - 根据用户问题生成一条优化的SQL语句。
  - 根据查询生成逻辑从**图表定义**中选择最合适的图表类型。

## 生成SQL要求:
 - 你必须仅生成一条合法、可执行的SQL查询语句 , 不得包含解释、Markdown、注释或额外文本。
 - **必须直接且完整地使用所提供的表结构和表关系来生成SQL语句**,表的格式必须是 catalog_name.table_name 的形式,生成的SQL语句中的catalog、table、column 必须用 "" 符号包裹。
 - 你必须严格遵守数据类型、外键关系及表结构中定义的约束。当没有表的关联关系时,你可以根据样例数据进行推断。
 - 不得假设表结构中未明确定义的列或表。
 - 显式声明所有JOIN条件(禁止自然连接);注意join性能,尽可能的在join之间先进行‘group by’操作以减少join的数据量。
 - 当遇到复杂查询时,使用WITH CTE分层组织复杂逻辑。
 - 若问题涉及时序,请合理使用提供的"当前时间"上下文(例如用于相对日期计算)。
 - 如果用户问题模糊或者缺乏足够的信息以生成正确的查询,请返回:`NULL`
 - 当用户明确要求查看明细数据且未指定具体数量时,应适当限制返回结果数量(如LIMIT 50)以提高查询性能,但如果用户指定了具体数量则按照用户要求执行
 - 对于聚合查询或统计类查询,不应随意添加LIMIT子句

## 提供的信息
 - 表结构:{db_schema}
 - 用户提问:{user_query}
 - 当前时间:{current_time}

## 图表定义
 - generate_line_chart: 用于显示数据随时间或连续变量的趋势
 - generate_bar_chart: 用于比较不同类别的值,适合横向对比
 - generate_pie_chart: 用于显示数据的占比关系
 - generate_table: 生成结构化表格,用于展示明细数据
 - ... (共20+种图表类型)

## 输出格式
 - 你**必须且只能**输出一个符合以下结构的 **纯 JSON 对象**:
 ```json
 {{
     "sql_query": "生成的SQL语句字符串",
     "chart_type": "推荐的图表类型字符串,如 \"generate_area_chart\""
 }}

""")


**Prompt 设计要点**:
| 设计维度 | 具体策略 | 效果 |
| :--- | :--- | :--- |
| **角色定位** | 定义为专业 DBA | 提升 SQL 生成质量 |
| **约束明确** | 禁止自然连接、强制使用双引号 | 避免 SQL 语法错误 |
| **性能优化** | 提示先 GROUP BY 再 JOIN | 减少计算量 |
| **容错处理** | 问题模糊时返回 NULL | 避免错误查询 |
| **图表推荐** | 提供 20+ 图表类型定义 | 智能匹配可视化方式 |
| **输出格式** | 强制 JSON 输出 | 便于解析 |

##### 2.3.2 表结构上下文构建

```json
db_schema = [
    {
        "table_name": "销售数据_Sheet1",
        "catalog_name": "销售数据",
        "columns": {
            "日期": {"comment": "日期", "type": "VARCHAR(255)"},
            "产品": {"comment": "产品", "type": "VARCHAR(255)"},
            "销售额": {"comment": "销售额", "type": "BIGINT"}
        },
        "sample_data": [
            {"日期": "2024-01-01", "产品": "A", "销售额": 1000},
            ...
        ],
        "table_comment": "销售数据.xlsx - Sheet1"
    }
]

关键信息

  • 列注释:保留原始列名作为注释(支持中文)
  • 样本数据:帮助 LLM 理解数据分布
  • 表注释:标注数据来源(文件名 + Sheet名)
2.4 SQL 执行节点
def exe_sql_excel_query(state):
    # 1. 获取 DuckDB 管理器
    duckdb_manager = get_duckdb_manager(chat_id=state.get("chat_id"))

    # 2. 清理 SQL(移除反引号)
    sql = state["generated_sql"].replace("`", "")

    # 3. 执行查询
    columns, data = duckdb_manager.execute_sql(sql)

    # 4. 封装结果
    state["execution_result"] = ExecutionResult(
        success=True,
        columns=columns,
        data=data
    )

    return state

执行结果结构

class ExecutionResult(BaseModel):
    success: bool                           # 是否成功
    columns: List[str]                      # 列名列表
    data: Optional[List[Dict[str, Any]]]    # 查询结果(字典列表)
    error: Optional[str]                    # 错误信息
2.5 数据渲染节点

根据图表类型选择不同的渲染方式:

def data_render_condition(state: ExcelAgentState) -> str:
    chart_type = state.get("chart_type")

    # 表格类型使用 ECharts 渲染
    if chart_type.lower() in ["mcp-server-chart-generate_table"]:
        return "data_render_apache"

    # 其他图表使用 AntV MCP 渲染
    return "data_render"
2.5.1 表格渲染(ECharts)
def excel_data_render_apache(state):
    # 1. 解析 SQL 获取列信息
    is_select_all = check_if_select_all(generated_sql)

    if is_select_all:
        # SELECT * 查询:从表结构中获取所有列
        column_comments = get_all_column_comments_for_tables(table_names, db_info)
    else:
        # 普通查询:解析 SELECT 子句
        column_comments = extract_select_columns_with_comments(generated_sql, db_info)

    # 2. 构建表格数据
    table_data = {
        "llm": {"type": "response_table"},
        "data": {
            "column": column_comments,  # 列标题(中文)
            "result": [
                {column_comments[i]: row[actual_columns[i]] for i in range(len(columns))}
                for row in data_result.data
            ]
        }
    }

    return state

技术难点

  • 列名映射:将数据库列名映射为中文注释
  • SELECT * 处理:需要从表结构中提取所有列信息
  • 别名处理:聚合函数的别名优先级高于列注释
2.5.2 图表渲染(AntV MCP)

对于非表格类型的图表,调用 AntV MCP 服务生成可视化:

def data_render_ant(state):
    # 1. 准备图表数据
    chart_data = {
        "type": state["chart_type"],
        "data": state["execution_result"].data
    }

    # 2. 调用 MCP 服务生成图表
    chart_url = mcp_client.generate_chart(chart_data)

    state["chart_url"] = chart_url
    return state
2.6 流式响应设计

为了提升用户体验,系统采用 SSE 实时推送每个节点的执行进度:

async def run_excel_agent(self, query, response, chat_id, ...):
    # 1. 异步流式执行 LangGraph
    async for chunk_dict in graph.astream(input=initial_state, stream_mode="updates"):
        step_name, step_value = next(iter(chunk_dict.items()))

        # 2. 发送步骤标题(折叠面板)
        if step_name not in ["summarize", "data_render"]:
            await response.write(f'<details><summary>{step_name}...</summary>')

        # 3. 发送步骤内容
        content = self._format_step_content(step_value)
        await response.write(content)

        # 4. 关闭折叠面板
        if step_name not in ["summarize", "data_render"]:
            await response.write('</details>')

前端渲染效果

▼ excel_parsing...
  文件解析结果
  文件列表:
  1. file_name: 销售数据.xlsx | Catalog: 销售数据 | Sheets: 2 | 上传时间: 2024-11-28
▼ sql_generator...
  SELECT "销售数据"."Sheet1"."产品", SUM("销售数据"."Sheet1"."销售额") AS 总销售额
  FROM "销售数据"."Sheet1"
  GROUP BY "销售数据"."Sheet1"."产品"
▼ sql_executor...
  ✅ 查询执行成功!返回 5 行数据,2 列
分析报告:根据查询结果,产品A的总销售额为50000元,产品B为38000元...
[表格/图表渲染]

三、关键技术实现

3.1 多文件多 Sheet 支持
3.1.1 Catalog 隔离机制
# 文件1:销售数据.xlsx(2个Sheet)
Catalog: 销售数据
  - Table: Sheet1
  - Table: Sheet2

# 文件2:库存数据.csv
Catalog: 库存数据
  - Table: 库存数据

# SQL 查询示例
SELECT
    s."产品",
    s."销售额",
    i."库存量"
FROM "销售数据"."Sheet1" s
JOIN "库存数据"."库存数据" i ON s."产品" = i."产品"

优势

  • 避免表名冲突(不同文件可能有同名 Sheet)
  • 清晰的数据来源标识
  • 支持跨文件关联查询
3.1.2 表名清理规则
def _sanitize_table_name(self, sheet_name: str) -> str:
    # 1. 替换非法字符(保留中文)
    table_name = re.sub(r'[^\w\u4e00-\u9fa5]', '_', sheet_name)

    # 2. 移除首尾下划线
    table_name = table_name.strip('_')

    # 3. 确保不以数字开头
    if table_name and table_name[0].isdigit():
        table_name = f'table_{table_name}'

    return table_name or 'unknown_sheet'

处理示例

原始名称 清理后 说明
Sheet1 Sheet1 无需处理
销售数据-2024 销售数据_2024 替换特殊字符
2024年数据 table_2024年数据 数字开头加前缀
产品 (新) 产品_新 去除括号
3.2 SQL 解析与列名映射

使用 SQLGlot 解析 SQL 语句,提取列信息并映射为中文注释。

3.2.1 提取表名
def extract_table_names_sqlglot(sql: str) -> list:
    expression = parse(sql)[0]
    tables = set()
    for table in expression.find_all(sqlglot.exp.Table):
        tables.add(table.name)
    return list(tables)
3.2.2 提取列名并映射注释
def extract_select_columns_with_comments(sql, schema_inspector):
    expressions = parse(sql)
    column_info_list = []

    for expression in expressions:
        selects = expression.find_all(sqlglot.exp.Select)
        for select in selects:
            for proj in select.expressions:
                # 处理别名
                if isinstance(proj, sqlglot.exp.Alias):
                    column_info_list.append({
                        "name": proj.this.name,
                        "alias": proj.alias,
                        "is_aggregate": isinstance(proj.this, (sqlglot.exp.Sum, ...))
                    })
                # 处理普通列
                elif isinstance(proj, sqlglot.exp.Column):
                    column_info_list.append({
                        "name": proj.name,
                        "table": proj.table
                    })

    # 映射为中文注释
    result_columns = []
    for col_info in column_info_list:
        if col_info.get("is_aggregate") or col_info.get("alias"):
            # 聚合函数保留别名
            result_columns.append(col_info["alias"])
        else:
            # 普通列查找注释
            comment = find_column_comment(col_info["name"], col_info["table"], schema_inspector)
            result_columns.append(comment or col_info["name"])

    return result_columns

处理示例

SQL 片段 实际列名 显示列名
SELECT "产品" 产品 产品
SELECT SUM("销售额") AS 总销售额 总销售额 总销售额
SELECT "日期", COUNT(*) AS 订单数 日期,订单数 日期,订单数
3.3 链路追踪(Langfuse)

集成 Langfuse 记录每次对话的完整执行链路:

if self.ENABLE_TRACING:
    langfuse = get_client()
    with langfuse.start_as_current_observation(
        input=query,
        as_type="agent",
        name="表格问答"
    ) as rootspan:
        rootspan.update_trace(session_id=chat_id, user_id=user_id)

        # 执行 LangGraph
        async for chunk in graph.astream(...):
            # 每个节点的执行会自动记录到 Langfuse
            ...

追踪信息包括

  • 每个节点的输入输出
  • LLM 调用的 token 消耗
  • 执行耗时
  • 错误堆栈
3.4 任务取消机制

支持用户中途取消长时间运行的任务:

class ExcelAgent:
    def __init__(self):
        self.running_tasks = {}  # {task_id: {"cancelled": False}}

    async def run_excel_agent(self, ...):
        task_id = user_dict["id"]
        self.running_tasks[task_id] = {"cancelled": False}

        async for chunk in graph.astream(...):
            # 检查是否已取消
            if self.running_tasks[task_id]["cancelled"]:
                await response.write("> 这条消息已停止")
                break

            # 处理节点输出
            ...

    async def cancel_task(self, task_id: str):
        if task_id in self.running_tasks:
            self.running_tasks[task_id]["cancelled"] = True

四、实战案例分析

案例1:单文件简单查询

用户问题销售额最高的前5个产品是哪些?

执行流程

1. excel_parsing
   - 注册文件:销售数据.xlsx
   - Catalog: 销售数据
   - Table: Sheet1 (100行, 3列)
2. sql_generator
   - 生成SQL:
     SELECT "销售数据"."Sheet1"."产品", SUM("销售数据"."Sheet1"."销售额") AS 总销售额
     FROM "销售数据"."Sheet1"
     GROUP BY "销售数据"."Sheet1"."产品"
     ORDER BY 总销售额 DESC
     LIMIT 5
   - 推荐图表: generate_bar_chart
3. sql_executor
   - 返回 5 行数据
4. summarize
   - 生成报告:根据销售数据分析,销售额最高的5个产品分别是...
5. data_render
   - 渲染柱状图
案例2:多文件关联查询

用户问题哪些产品的库存量低于销售量的10%?

执行流程

1. excel_parsing
   - 注册文件1:销售数据.xlsx → Catalog: 销售数据
   - 注册文件2:库存数据.csv → Catalog: 库存数据
2. sql_generator
   - 生成SQL:
     SELECT
         s."产品",
         SUM(s."销售额") AS 总销售额,
         i."库存量"
     FROM "销售数据"."Sheet1" s
     JOIN "库存数据"."库存数据" i ON s."产品" = i."产品"
     GROUP BY s."产品", i."库存量"
     HAVING i."库存量" < SUM(s."销售额") * 0.1
   - 推荐图表: generate_table
3. sql_executor
   - 返回 3 行数据
4. summarize
   - 生成报告:有3个产品的库存量低于销售量的10%,分别是...
5. data_render_apache
   - 渲染表格

五、性能优化与最佳实践

5.1 性能优化策略
优化点 策略 效果
连接复用 每个 chat_id 复用同一个 DuckDB 连接 减少连接开销
延迟初始化 DuckDB 连接延迟到首次使用时创建 降低内存占用
会话超时 10小时无活动自动清理 避免资源泄漏
LIMIT 优化 明细查询自动添加 LIMIT 50 减少数据传输
JOIN 优化 Prompt 提示先 GROUP BY 再 JOIN 减少计算量
样本数据 只保存前5行样本 减少上下文长度
5.2 错误处理
try:
    # 执行 SQL
    columns, data = duckdb_manager.execute_sql(sql)
    state["execution_result"] = ExecutionResult(success=True, ...)
except Exception as e:
    # 记录错误
    logger.error(f"SQL执行失败: {e}")
    state["execution_result"] = ExecutionResult(
        success=False,
        error=str(e)
    )

常见错误类型

  • SQL 语法错误(列名不存在、表名错误)
  • 类型转换错误(字符串与数字比较)
  • 内存溢出(大数据集未 LIMIT)
5.3 安全性考虑
风险 防护措施
SQL 注入 使用参数化查询(DuckDB 自动转义)
数据泄漏 每个 chat_id 独立的 DuckDB 实例
资源耗尽 会话超时 + 查询结果 LIMIT
恶意文件 文件扩展名白名单(xlsx/xls/csv)

六、总结

6.1 核心优势
维度 优势
易用性 自然语言提问,无需学习 SQL
灵活性 支持多文件、多 Sheet、跨文件关联
智能化 自动推荐图表类型
可扩展 LangGraph 编排,易于添加新节点
高性能 DuckDB 列式存储,OLAP 优化
可观测 Langfuse 链路追踪
6.2 技术亮点
  1. 会话隔离:每个 chat_id 独立的 DuckDB 实例,支持多用户并发
  2. 智能映射:自动将 Excel 列名映射为中文注释
  3. 流式响应:SSE 实时推送思考过程,提升用户体验
  4. 图表推荐:根据查询逻辑智能推荐 20+ 种图表类型
  5. SQL 优化:Prompt 引导生成高性能 SQL(先 GROUP BY 再 JOIN)

该项目提供了一个基于 LangGraph 的强大编排框架,结合 DuckDB 的高性能分析能力和大语言模型的自然语言理解能力,打造了一个高效的 Excel 智能分析助手。其轻量级的Python Sanic后端架构也易于快速部署和二次开发。

您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区(YunPan.Plus) ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-3 14:50 , Processed in 0.064441 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 CloudStack.

快速回复 返回顶部 返回列表