在数据分析场景中,Excel 文件是最常见的数据载体之一。但传统的 Excel 数据分析往往需要用户具备一定的技术能力,比如熟悉公式、透视表或者 SQL 查询。有没有一种可能,让用户用自然语言直接提问,系统就能自动完成数据分析并给出可视化结果?
我们来聊聊一个基于 LangGraph + DuckDB + ReActAgent 架构的 Excel 问答系统,它能帮你实现这个目标:
- 支持多文件、多 Sheet 的统一分析
- 自动将 Excel 数据映射为数据库表结构
- 根据自然语言问题生成 SQL 查询
- 智能推荐可视化图表类型
- 实时流式返回思考过程和分析结果
一、系统架构设计
1.1 整体架构
整个系统采用 LangGraph 作为编排引擎,将 Excel 问答流程拆解为多个节点,每个节点负责特定的任务。
用户问题 → Excel解析 → SQL生成 → SQL执行 → 结果总结 → 数据渲染
| 节点名称 |
功能描述 |
输入 |
输出 |
| excel_parsing |
解析 Excel 文件,注册到 DuckDB |
文件列表 |
表结构信息 |
| sql_generator |
根据问题生成 SQL 查询 |
用户问题 + 表结构 |
SQL 语句 + 图表类型 |
| sql_executor |
执行 SQL 查询 |
SQL 语句 |
查询结果 |
| summarize |
生成自然语言总结 |
查询结果 |
分析报告 |
| data_render |
渲染可视化图表 |
查询结果 + 图表类型 |
图表数据 |
1.2 技术栈选型
| 技术组件 |
作用 |
选型理由 |
| LangGraph |
工作流编排 |
支持复杂的状态管理和条件分支,便于构建 Agent 流程 |
| DuckDB |
内存数据库 |
轻量级、支持 SQL、无需额外部署,天然支持 Pandas 集成 |
| LangChain |
LLM 调用封装 |
统一的 LLM 接口,支持多种模型切换 |
| Pandas |
数据处理 |
强大的 Excel 读取和数据清洗能力 |
| SQLGlot |
SQL 解析 |
用于解析 SQL 语句,提取表名、列名等信息 |
| Langfuse |
链路追踪 |
记录每次对话的完整执行链路,便于调试和优化 |
1.3 核心设计理念
会话隔离:每个 chat_id 拥有独立的 DuckDB 实例,避免多用户数据混淆。
流式响应:采用 SSE(Server-Sent Events)实时推送思考过程,提升用户体验。
智能推荐:不仅生成 SQL,还根据查询逻辑推荐最合适的图表类型(折线图、柱状图、表格等)。
二、核心模块详解
2.1 状态管理(ExcelAgentState)
LangGraph 的核心是状态流转。我们定义了 ExcelAgentState 来管理整个问答流程的状态:
class ExcelAgentState(TypedDict):
user_query: str # 用户问题
file_list: list # 文件列表
chat_id: Optional[str] # 聊天ID(会话隔离)
file_metadata: Dict[str, FileInfo] # 文件元数据
sheet_metadata: Dict[str, SheetInfo] # Sheet元数据
db_info: list[dict] # 表结构信息
catalog_info: Dict[str, str] # Catalog映射
generated_sql: Optional[str] # 生成的SQL
chart_url: Optional[str] # 图表URL
chart_type: Optional[str] # 图表类型
apache_chart_data: Optional[Dict[str, Any]] # ECharts数据
execution_result: Optional[ExecutionResult] # SQL执行结果
report_summary: Optional[str] # 分析报告
设计亮点:
- 使用 TypedDict 确保类型安全
- 区分文件级元数据(file_metadata)和表级元数据(sheet_metadata)
- 支持多 Catalog 管理,每个文件对应一个独立的 Catalog
2.2 DuckDB 连接管理
2.2.1 为什么选择 DuckDB?
| 对比项 |
DuckDB |
SQLite |
PostgreSQL |
| 部署复杂度 |
嵌入式,无需部署 |
嵌入式 |
需要独立部署 |
| 分析性能 |
列式存储,OLAP优化 |
行式存储 |
行式存储 |
| Pandas集成 |
原生支持 |
需要转换 |
需要转换 |
| 内存占用 |
低 |
低 |
高 |
| SQL兼容性 |
PostgreSQL语法 |
标准SQL |
PostgreSQL |
2.2.2 会话级连接管理
为了支持多用户并发,我们设计了两层管理器:
# 第一层:单个 Excel 文件的 DuckDB 管理器
class ExcelDuckDBManager:
def __init__(self):
self._connection = None # 延迟初始化
self._registered_catalogs = {} # Catalog 注册表
self._registered_tables = {} # 表注册表
def register_excel_file(self, file_path, file_name):
"""注册 Excel 文件到 DuckDB"""
# 1. 读取所有 Sheet
# 2. 清理列名(去除特殊字符)
# 3. 创建 Catalog 和 Table
# 4. 返回表结构信息
# 第二层:聊天级别的管理器(会话隔离)
class ChatDuckDBManager:
def __init__(self):
self._chat_managers = {} # {chat_id: ExcelDuckDBManager}
self._session_timeout = 36000 # 10小时超时
def get_manager(self, chat_id):
"""为每个 chat_id 创建独立的管理器"""
if chat_id not in self._chat_managers:
self._chat_managers[chat_id] = ExcelDuckDBManager()
return self._chat_managers[chat_id]
核心优势:
- 会话隔离:不同用户的数据互不干扰
- 资源复用:同一会话内多次查询共享连接
- 自动清理:超时会话自动释放资源
2.2.3 数据注册流程
def register_excel_file(self, file_path: str, file_name: str):
# 1. 生成唯一的 Catalog 名称
catalog_name = self._get_unique_catalog_name(file_name)
# 2. 读取 Excel 所有 Sheet
excel_file_data = pd.ExcelFile(file_path)
dataframes = [(sheet, pd.read_excel(file_path, sheet_name=sheet))
for sheet in excel_file_data.sheet_names]
# 3. 清理列名(去除特殊字符、中文支持)
for sheet_name, df in dataframes:
df.columns = [self._sanitize_column_name(col) for col in df.columns]
# 4. 注册到 DuckDB
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}")
conn.execute(f'CREATE TABLE "{catalog_name}"."{table_name}" AS SELECT * FROM df')
# 5. 提取表结构信息
columns_info = {col: {"comment": col, "type": self._map_pandas_dtype_to_sql(df[col].dtype)}
for col in df.columns}
return catalog_name, registered_tables
关键技术点:
- 列名清理:支持中文列名,自动处理特殊字符
- 类型映射:Pandas dtype → SQL type(如
int64 → BIGINT)
- 样本数据:保存前 5 行数据用于 SQL 生成时的上下文参考
2.3 SQL 生成节点
这是整个系统的核心,需要将自然语言转换为精确的 SQL 查询。
2.3.1 Prompt 设计
prompt = ChatPromptTemplate.from_template("""
你是一位专业的数据库管理员(DBA),任务是根据提供的数据库结构、表关系以及用户需求,生成优化的DUCKDB SQL查询语句,并推荐合适的可视化图表。
## 任务
- 根据用户问题生成一条优化的SQL语句。
- 根据查询生成逻辑从**图表定义**中选择最合适的图表类型。
## 生成SQL要求:
- 你必须仅生成一条合法、可执行的SQL查询语句 , 不得包含解释、Markdown、注释或额外文本。
- **必须直接且完整地使用所提供的表结构和表关系来生成SQL语句**,表的格式必须是 catalog_name.table_name 的形式,生成的SQL语句中的catalog、table、column 必须用 "" 符号包裹。
- 你必须严格遵守数据类型、外键关系及表结构中定义的约束。当没有表的关联关系时,你可以根据样例数据进行推断。
- 不得假设表结构中未明确定义的列或表。
- 显式声明所有JOIN条件(禁止自然连接);注意join性能,尽可能的在join之间先进行‘group by’操作以减少join的数据量。
- 当遇到复杂查询时,使用WITH CTE分层组织复杂逻辑。
- 若问题涉及时序,请合理使用提供的"当前时间"上下文(例如用于相对日期计算)。
- 如果用户问题模糊或者缺乏足够的信息以生成正确的查询,请返回:`NULL`
- 当用户明确要求查看明细数据且未指定具体数量时,应适当限制返回结果数量(如LIMIT 50)以提高查询性能,但如果用户指定了具体数量则按照用户要求执行
- 对于聚合查询或统计类查询,不应随意添加LIMIT子句
## 提供的信息
- 表结构:{db_schema}
- 用户提问:{user_query}
- 当前时间:{current_time}
## 图表定义
- generate_line_chart: 用于显示数据随时间或连续变量的趋势
- generate_bar_chart: 用于比较不同类别的值,适合横向对比
- generate_pie_chart: 用于显示数据的占比关系
- generate_table: 生成结构化表格,用于展示明细数据
- ... (共20+种图表类型)
## 输出格式
- 你**必须且只能**输出一个符合以下结构的 **纯 JSON 对象**:
```json
{{
"sql_query": "生成的SQL语句字符串",
"chart_type": "推荐的图表类型字符串,如 \"generate_area_chart\""
}}
""")
**Prompt 设计要点**:
| 设计维度 | 具体策略 | 效果 |
| :--- | :--- | :--- |
| **角色定位** | 定义为专业 DBA | 提升 SQL 生成质量 |
| **约束明确** | 禁止自然连接、强制使用双引号 | 避免 SQL 语法错误 |
| **性能优化** | 提示先 GROUP BY 再 JOIN | 减少计算量 |
| **容错处理** | 问题模糊时返回 NULL | 避免错误查询 |
| **图表推荐** | 提供 20+ 图表类型定义 | 智能匹配可视化方式 |
| **输出格式** | 强制 JSON 输出 | 便于解析 |
##### 2.3.2 表结构上下文构建
```json
db_schema = [
{
"table_name": "销售数据_Sheet1",
"catalog_name": "销售数据",
"columns": {
"日期": {"comment": "日期", "type": "VARCHAR(255)"},
"产品": {"comment": "产品", "type": "VARCHAR(255)"},
"销售额": {"comment": "销售额", "type": "BIGINT"}
},
"sample_data": [
{"日期": "2024-01-01", "产品": "A", "销售额": 1000},
...
],
"table_comment": "销售数据.xlsx - Sheet1"
}
]
关键信息:
- 列注释:保留原始列名作为注释(支持中文)
- 样本数据:帮助 LLM 理解数据分布
- 表注释:标注数据来源(文件名 + Sheet名)
2.4 SQL 执行节点
def exe_sql_excel_query(state):
# 1. 获取 DuckDB 管理器
duckdb_manager = get_duckdb_manager(chat_id=state.get("chat_id"))
# 2. 清理 SQL(移除反引号)
sql = state["generated_sql"].replace("`", "")
# 3. 执行查询
columns, data = duckdb_manager.execute_sql(sql)
# 4. 封装结果
state["execution_result"] = ExecutionResult(
success=True,
columns=columns,
data=data
)
return state
执行结果结构:
class ExecutionResult(BaseModel):
success: bool # 是否成功
columns: List[str] # 列名列表
data: Optional[List[Dict[str, Any]]] # 查询结果(字典列表)
error: Optional[str] # 错误信息
2.5 数据渲染节点
根据图表类型选择不同的渲染方式:
def data_render_condition(state: ExcelAgentState) -> str:
chart_type = state.get("chart_type")
# 表格类型使用 ECharts 渲染
if chart_type.lower() in ["mcp-server-chart-generate_table"]:
return "data_render_apache"
# 其他图表使用 AntV MCP 渲染
return "data_render"
2.5.1 表格渲染(ECharts)
def excel_data_render_apache(state):
# 1. 解析 SQL 获取列信息
is_select_all = check_if_select_all(generated_sql)
if is_select_all:
# SELECT * 查询:从表结构中获取所有列
column_comments = get_all_column_comments_for_tables(table_names, db_info)
else:
# 普通查询:解析 SELECT 子句
column_comments = extract_select_columns_with_comments(generated_sql, db_info)
# 2. 构建表格数据
table_data = {
"llm": {"type": "response_table"},
"data": {
"column": column_comments, # 列标题(中文)
"result": [
{column_comments[i]: row[actual_columns[i]] for i in range(len(columns))}
for row in data_result.data
]
}
}
return state
技术难点:
- 列名映射:将数据库列名映射为中文注释
- SELECT * 处理:需要从表结构中提取所有列信息
- 别名处理:聚合函数的别名优先级高于列注释
2.5.2 图表渲染(AntV MCP)
对于非表格类型的图表,调用 AntV MCP 服务生成可视化:
def data_render_ant(state):
# 1. 准备图表数据
chart_data = {
"type": state["chart_type"],
"data": state["execution_result"].data
}
# 2. 调用 MCP 服务生成图表
chart_url = mcp_client.generate_chart(chart_data)
state["chart_url"] = chart_url
return state
2.6 流式响应设计
为了提升用户体验,系统采用 SSE 实时推送每个节点的执行进度:
async def run_excel_agent(self, query, response, chat_id, ...):
# 1. 异步流式执行 LangGraph
async for chunk_dict in graph.astream(input=initial_state, stream_mode="updates"):
step_name, step_value = next(iter(chunk_dict.items()))
# 2. 发送步骤标题(折叠面板)
if step_name not in ["summarize", "data_render"]:
await response.write(f'<details><summary>{step_name}...</summary>')
# 3. 发送步骤内容
content = self._format_step_content(step_value)
await response.write(content)
# 4. 关闭折叠面板
if step_name not in ["summarize", "data_render"]:
await response.write('</details>')
前端渲染效果:
▼ excel_parsing...
文件解析结果
文件列表:
1. file_name: 销售数据.xlsx | Catalog: 销售数据 | Sheets: 2 | 上传时间: 2024-11-28
▼ sql_generator...
SELECT "销售数据"."Sheet1"."产品", SUM("销售数据"."Sheet1"."销售额") AS 总销售额
FROM "销售数据"."Sheet1"
GROUP BY "销售数据"."Sheet1"."产品"
▼ sql_executor...
✅ 查询执行成功!返回 5 行数据,2 列
分析报告:根据查询结果,产品A的总销售额为50000元,产品B为38000元...
[表格/图表渲染]
三、关键技术实现
3.1 多文件多 Sheet 支持
3.1.1 Catalog 隔离机制
# 文件1:销售数据.xlsx(2个Sheet)
Catalog: 销售数据
- Table: Sheet1
- Table: Sheet2
# 文件2:库存数据.csv
Catalog: 库存数据
- Table: 库存数据
# SQL 查询示例
SELECT
s."产品",
s."销售额",
i."库存量"
FROM "销售数据"."Sheet1" s
JOIN "库存数据"."库存数据" i ON s."产品" = i."产品"
优势:
- 避免表名冲突(不同文件可能有同名 Sheet)
- 清晰的数据来源标识
- 支持跨文件关联查询
3.1.2 表名清理规则
def _sanitize_table_name(self, sheet_name: str) -> str:
# 1. 替换非法字符(保留中文)
table_name = re.sub(r'[^\w\u4e00-\u9fa5]', '_', sheet_name)
# 2. 移除首尾下划线
table_name = table_name.strip('_')
# 3. 确保不以数字开头
if table_name and table_name[0].isdigit():
table_name = f'table_{table_name}'
return table_name or 'unknown_sheet'
处理示例:
| 原始名称 |
清理后 |
说明 |
Sheet1 |
Sheet1 |
无需处理 |
销售数据-2024 |
销售数据_2024 |
替换特殊字符 |
2024年数据 |
table_2024年数据 |
数字开头加前缀 |
产品 (新) |
产品_新 |
去除括号 |
3.2 SQL 解析与列名映射
使用 SQLGlot 解析 SQL 语句,提取列信息并映射为中文注释。
3.2.1 提取表名
def extract_table_names_sqlglot(sql: str) -> list:
expression = parse(sql)[0]
tables = set()
for table in expression.find_all(sqlglot.exp.Table):
tables.add(table.name)
return list(tables)
3.2.2 提取列名并映射注释
def extract_select_columns_with_comments(sql, schema_inspector):
expressions = parse(sql)
column_info_list = []
for expression in expressions:
selects = expression.find_all(sqlglot.exp.Select)
for select in selects:
for proj in select.expressions:
# 处理别名
if isinstance(proj, sqlglot.exp.Alias):
column_info_list.append({
"name": proj.this.name,
"alias": proj.alias,
"is_aggregate": isinstance(proj.this, (sqlglot.exp.Sum, ...))
})
# 处理普通列
elif isinstance(proj, sqlglot.exp.Column):
column_info_list.append({
"name": proj.name,
"table": proj.table
})
# 映射为中文注释
result_columns = []
for col_info in column_info_list:
if col_info.get("is_aggregate") or col_info.get("alias"):
# 聚合函数保留别名
result_columns.append(col_info["alias"])
else:
# 普通列查找注释
comment = find_column_comment(col_info["name"], col_info["table"], schema_inspector)
result_columns.append(comment or col_info["name"])
return result_columns
处理示例:
| SQL 片段 |
实际列名 |
显示列名 |
SELECT "产品" |
产品 |
产品 |
SELECT SUM("销售额") AS 总销售额 |
总销售额 |
总销售额 |
SELECT "日期", COUNT(*) AS 订单数 |
日期,订单数 |
日期,订单数 |
3.3 链路追踪(Langfuse)
集成 Langfuse 记录每次对话的完整执行链路:
if self.ENABLE_TRACING:
langfuse = get_client()
with langfuse.start_as_current_observation(
input=query,
as_type="agent",
name="表格问答"
) as rootspan:
rootspan.update_trace(session_id=chat_id, user_id=user_id)
# 执行 LangGraph
async for chunk in graph.astream(...):
# 每个节点的执行会自动记录到 Langfuse
...
追踪信息包括:
- 每个节点的输入输出
- LLM 调用的 token 消耗
- 执行耗时
- 错误堆栈
3.4 任务取消机制
支持用户中途取消长时间运行的任务:
class ExcelAgent:
def __init__(self):
self.running_tasks = {} # {task_id: {"cancelled": False}}
async def run_excel_agent(self, ...):
task_id = user_dict["id"]
self.running_tasks[task_id] = {"cancelled": False}
async for chunk in graph.astream(...):
# 检查是否已取消
if self.running_tasks[task_id]["cancelled"]:
await response.write("> 这条消息已停止")
break
# 处理节点输出
...
async def cancel_task(self, task_id: str):
if task_id in self.running_tasks:
self.running_tasks[task_id]["cancelled"] = True
四、实战案例分析
案例1:单文件简单查询
用户问题:销售额最高的前5个产品是哪些?
执行流程:
1. excel_parsing
- 注册文件:销售数据.xlsx
- Catalog: 销售数据
- Table: Sheet1 (100行, 3列)
2. sql_generator
- 生成SQL:
SELECT "销售数据"."Sheet1"."产品", SUM("销售数据"."Sheet1"."销售额") AS 总销售额
FROM "销售数据"."Sheet1"
GROUP BY "销售数据"."Sheet1"."产品"
ORDER BY 总销售额 DESC
LIMIT 5
- 推荐图表: generate_bar_chart
3. sql_executor
- 返回 5 行数据
4. summarize
- 生成报告:根据销售数据分析,销售额最高的5个产品分别是...
5. data_render
- 渲染柱状图
案例2:多文件关联查询
用户问题:哪些产品的库存量低于销售量的10%?
执行流程:
1. excel_parsing
- 注册文件1:销售数据.xlsx → Catalog: 销售数据
- 注册文件2:库存数据.csv → Catalog: 库存数据
2. sql_generator
- 生成SQL:
SELECT
s."产品",
SUM(s."销售额") AS 总销售额,
i."库存量"
FROM "销售数据"."Sheet1" s
JOIN "库存数据"."库存数据" i ON s."产品" = i."产品"
GROUP BY s."产品", i."库存量"
HAVING i."库存量" < SUM(s."销售额") * 0.1
- 推荐图表: generate_table
3. sql_executor
- 返回 3 行数据
4. summarize
- 生成报告:有3个产品的库存量低于销售量的10%,分别是...
5. data_render_apache
- 渲染表格
五、性能优化与最佳实践
5.1 性能优化策略
| 优化点 |
策略 |
效果 |
| 连接复用 |
每个 chat_id 复用同一个 DuckDB 连接 |
减少连接开销 |
| 延迟初始化 |
DuckDB 连接延迟到首次使用时创建 |
降低内存占用 |
| 会话超时 |
10小时无活动自动清理 |
避免资源泄漏 |
| LIMIT 优化 |
明细查询自动添加 LIMIT 50 |
减少数据传输 |
| JOIN 优化 |
Prompt 提示先 GROUP BY 再 JOIN |
减少计算量 |
| 样本数据 |
只保存前5行样本 |
减少上下文长度 |
5.2 错误处理
try:
# 执行 SQL
columns, data = duckdb_manager.execute_sql(sql)
state["execution_result"] = ExecutionResult(success=True, ...)
except Exception as e:
# 记录错误
logger.error(f"SQL执行失败: {e}")
state["execution_result"] = ExecutionResult(
success=False,
error=str(e)
)
常见错误类型:
- SQL 语法错误(列名不存在、表名错误)
- 类型转换错误(字符串与数字比较)
- 内存溢出(大数据集未 LIMIT)
5.3 安全性考虑
| 风险 |
防护措施 |
| SQL 注入 |
使用参数化查询(DuckDB 自动转义) |
| 数据泄漏 |
每个 chat_id 独立的 DuckDB 实例 |
| 资源耗尽 |
会话超时 + 查询结果 LIMIT |
| 恶意文件 |
文件扩展名白名单(xlsx/xls/csv) |
六、总结
6.1 核心优势
| 维度 |
优势 |
| 易用性 |
自然语言提问,无需学习 SQL |
| 灵活性 |
支持多文件、多 Sheet、跨文件关联 |
| 智能化 |
自动推荐图表类型 |
| 可扩展 |
LangGraph 编排,易于添加新节点 |
| 高性能 |
DuckDB 列式存储,OLAP 优化 |
| 可观测 |
Langfuse 链路追踪 |
6.2 技术亮点
- 会话隔离:每个 chat_id 独立的 DuckDB 实例,支持多用户并发
- 智能映射:自动将 Excel 列名映射为中文注释
- 流式响应:SSE 实时推送思考过程,提升用户体验
- 图表推荐:根据查询逻辑智能推荐 20+ 种图表类型
- SQL 优化:Prompt 引导生成高性能 SQL(先 GROUP BY 再 JOIN)
该项目提供了一个基于 LangGraph 的强大编排框架,结合 DuckDB 的高性能分析能力和大语言模型的自然语言理解能力,打造了一个高效的 Excel 智能分析助手。其轻量级的Python Sanic后端架构也易于快速部署和二次开发。