26人参与 • 2025-10-14 • 数据分析
在现代数据驱动决策的环境中,让非技术用户能够通过自然语言与数据进行交互变得越来越重要。本项目开发了一个基于restful api的智能数据分析服务,具有以下核心功能:
本项目采用了以下技术栈,每项技术都承担着特定角色:
| 技术组件 | 版本要求 | 职责说明 |
|---|---|---|
| fastapi | >=0.68.0 | 高性能web框架,提供api服务 |
| langchain | >=0.12.0 | 智能代理和链式处理核心 |
| pandas | >=1.3.0 | dataframe数据处理和分析 |
| uvicorn | >=0.15.0 | asgi服务器,用于运行fastapi |
| tongyi qwen | - | 大语言模型,提供自然语言理解能力 |
技术选型理由:
首先创建fastapi应用实例,并设置元数据信息:
from fastapi import fastapi
app = fastapi(
title="excel data analysis api",
description="基于pandas dataframe的数据分析api",
version="1.0.0"
)
fastapi基于python类型提示自动生成api文档,支持openapi标准,开发者可以通过/docs和/redoc端点访问交互式文档。
项目启动时自动加载excel数据并初始化langchain代理:
@app.on_event("startup")
async def startup_event():
"""应用启动时初始化模型和代理"""
try:
initialize_agent()
print("模型和代理初始化成功")
except exception as e:
print(f"初始化失败: {e}")
raise
数据加载过程支持多工作表excel文件,自动合并所有工作表到一个dataframe中:
def initialize_agent():
global llm, pandas_agent
# 读取excel文件中的所有工作表
excel_file = pd.excelfile(path)
all_sheets = {}
for sheet_name in excel_file.sheet_names:
all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)
# 合并所有工作表
df = pd.concat(all_sheets.values(), ignore_index=true)
这种方法确保了无论excel文件的结构如何,都能完整地加载所有数据。
使用langchain的create_pandas_dataframe_agent创建专门用于pandas数据处理的ai代理:
pandas_agent = create_pandas_dataframe_agent(
llm, # 通义千问模型实例
df, # 合并后的dataframe
verbose=true, # 启用详细日志
agent_type="zero-shot-react-description", # 代理类型
allow_dangerous_code=true, # 允许执行代码
max_iterations=5 # 最大迭代次数
)
这种代理结合了大语言模型的语言理解能力和pandas的数据处理能力,能够理解自然语言查询并将其转换为dataframe操作。
为实现真正的流式输出,创建了自定义回调处理器:
class streamingcallbackhandler(basecallbackhandler):
"""自定义回调处理器,用于实现真正的流式输出"""
def __init__(self):
self.tokens = []
self.finished = false
def on_llm_new_token(self, token: str, **kwargs: any) -> none:
"""当llm生成新token时调用"""
self.tokens.append(token)
def on_llm_end(self, response: any, **kwargs: any) -> none:
"""当llm生成结束时调用"""
self.finished = true
流式响应接口使用server-sent events(sse)技术:
@app.post("/chat", response_model=chatresponse)
async def chat_with_data(request: chatrequest):
# ... 省略其他代码 ...
if request.stream:
return streamingresponse(
true_stream_response(full_input, session_history, request.session_id),
media_type="text/event-stream",
headers={
"cache-control": "no-cache",
"connection": "keep-alive",
}
)
这种方式相比传统响应,能够实时地将生成的token发送给客户端,大幅减少用户感知的延迟。
为实现多用户支持,实现了基于session_id的会话管理:
conversation_history = {}
# 在聊天接口中维护会话历史
session_history = conversation_history.get(request.session_id, [])
session_history.append(request.message)
conversation_history[request.session_id] = session_history
还提供了清除历史记录的端点:
@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):
"""清除指定会话的历史记录"""
if session_id in conversation_history:
del conversation_history[session_id]
return {"message": f"会话 {session_id} 的历史记录已清除"}
这种设计允许不同用户或不同对话线程保持独立的上下文历史。
本项目实现了以下restful端点:
| 端点 | 方法 | 描述 | 参数 |
|---|---|---|---|
| /chat | post | 主聊天接口,支持流式和非流式响应 | session_id, message, history_length, stream |
| /clear_history/{session_id} | delete | 清除指定会话的历史记录 | session_id |
| /health | get | 健康检查端点 | 无 |
| /stream_test | get | 流式接口测试端点 | 无 |
主要请求和响应模型:
class chatrequest(basemodel):
session_id: str # 会话id,用于区分不同用户的对话历史
message: str # 用户消息
history_length: optional[int] = 5 # 历史消息长度,默认为5
stream: optional[bool] = false # 是否使用流式响应
class chatresponse(basemodel):
session_id: str
response: str
success: bool
error: optional[str] = none
使用uvicorn作为asgi服务器部署应用:
uvicorn main:app --host 0.0.0.0 --port 9113 --workers 4 --timeout-keep-alive 300
参数说明:
--workers 4:启动4个工作进程,充分利用多核cpu--timeout-keep-alive 300:保持连接超时时间设置为300秒--host 0.0.0.0:监听所有网络接口本项目展示了如何将fastapi、langchain和大语言模型结合,构建一个功能强大的智能数据分析api。关键优势包括:
未来发展方向:
通过这种技术组合,我们能够将先进的大语言模型能力转化为实用的企业级应用,真正实现"用自然语言与数据对话"的愿景。
# main.py
import pandas as pd
from langchain_community.chat_models.tongyi import chattongyi
from langchain_experimental.agents import create_pandas_dataframe_agent
import os
from fastapi import fastapi, httpexception, request
from pydantic import basemodel
from typing import list, optional
from fastapi.responses import streamingresponse
import asyncio
import json
import time
from langchain.callbacks.base import basecallbackhandler
from typing import any, dict, list
from langchain_core.output_parsers import stroutputparser
from langchain_core.prompts import chatprompttemplate
# 初始化 fastapi 应用
app = fastapi(title="excel data analysis api", description="基于 pandas dataframe 的数据分析 api")
# 全局变量存储模型和代理
llm = none
pandas_agent = none
conversation_history = {}
class streamingcallbackhandler(basecallbackhandler):
"""自定义回调处理器,用于实现真正的流式输出"""
def __init__(self):
self.tokens = []
self.finished = false
def on_llm_new_token(self, token: str, **kwargs: any) -> none:
"""当llm生成新token时调用"""
self.tokens.append(token)
def on_llm_end(self, response: any, **kwargs: any) -> none:
"""当llm生成结束时调用"""
self.finished = true
def get_tokens(self):
"""获取已生成的tokens"""
return self.tokens
class chatrequest(basemodel):
session_id: str # 会话id,用于区分不同用户的对话历史
message: str # 用户消息
history_length: optional[int] = 5 # 历史消息长度,默认为5
stream: optional[bool] = false # 是否使用流式响应
class chatresponse(basemodel):
session_id: str
response: str
success: bool
error: optional[str] = none
def initialize_agent():
"""初始化模型和数据代理"""
global llm, pandas_agent
path = r'./data/1.xlsx'
# 检查文件是否存在
if not os.path.exists(path):
raise filenotfounderror(f"文件未找到:{path}")
# 读取 excel 中的所有工作表
excel_file = pd.excelfile(path)
all_sheets = {}
for sheet_name in excel_file.sheet_names:
all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)
# 合并所有工作表到一个dataframe中
df = pd.concat(all_sheets.values(), ignore_index=true)
os.environ["dashscope_api_key"] = 'sk-50254f6d4df1ab3c9baf30093c4e'
llm = chattongyi(
model="qwen-max-latest",
temperature=0.4,
streaming=true # 启用流式输出
)
# 创建excel_agent
pandas_agent = create_pandas_dataframe_agent(
llm,
df,
verbose=true,
agent_type="zero-shot-react-description",
allow_dangerous_code=true,
max_iterations=5
)
@app.on_event("startup")
async def startup_event():
"""应用启动时初始化模型和代理"""
try:
initialize_agent()
print("模型和代理初始化成功")
except exception as e:
print(f"初始化失败: {e}")
raise
@app.post("/chat", response_model=chatresponse)
async def chat_with_data(request: chatrequest):
"""与数据进行对话(支持流式和非流式响应)"""
global pandas_agent, conversation_history
if not pandas_agent:
raise httpexception(status_code=500, detail="模型未初始化")
try:
# 获取或创建会话历史
session_history = conversation_history.get(request.session_id, [])
# 构建历史文本
history_text = "\n".join(session_history[-request.history_length:])
# 构造带上下文的输入
if history_text:
full_input = f"聊天历史:{history_text},当前问题:{request.message},请根据历史和当前问题,不要截断输出,展示全部内容,不要总结,严格按照查询内容输出,不要多余输出,如果无法确定指代对象,请询问用户澄清,并且不要编造。".strip()
else:
full_input = request.message
# 如果请求流式响应
if request.stream:
return streamingresponse(
true_stream_response(full_input, session_history, request.session_id),
media_type="text/event-stream",
headers={
"cache-control": "no-cache",
"connection": "keep-alive",
}
)
# 非流式响应
response = pandas_agent.invoke({"input": full_input})
output = response['output'] if isinstance(response, dict) else str(response)
# 更新会话历史
session_history.append(request.message)
conversation_history[request.session_id] = session_history
return chatresponse(
session_id=request.session_id,
response=output,
success=true
)
except exception as e:
if request.stream:
# 对于流式请求,返回流式错误响应
async def error_stream():
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return streamingresponse(error_stream(), media_type="text/event-stream")
else:
return chatresponse(
session_id=request.session_id,
response="",
success=false,
error=str(e)
)
async def true_stream_response(input_text: str, session_history: list, session_id: str):
"""真正的流式响应生成器"""
global pandas_agent, conversation_history
try:
# 使用回调处理器捕获流式输出
callback_handler = streamingcallbackhandler()
# 在后台运行代理处理
async def run_agent():
try:
# 调用代理,传入回调处理器
response = await asyncio.get_event_loop().run_in_executor(
none,
lambda: pandas_agent.invoke(
{"input": input_text},
{"callbacks": [callback_handler]}
)
)
# 确保结束标记被设置
callback_handler.finished = true
except exception as e:
print(f"代理执行错误: {e}")
callback_handler.finished = true
# 启动代理任务
agent_task = asyncio.create_task(run_agent())
# 流式输出循环
last_token_count = 0
start_time = time.time()
max_wait_time = 60 # 最大等待时间60秒
while not callback_handler.finished and (time.time() - start_time) < max_wait_time:
current_tokens = callback_handler.get_tokens()
# 如果有新token,发送给客户端
if len(current_tokens) > last_token_count:
for i in range(last_token_count, len(current_tokens)):
token_data = {
"token": current_tokens[i],
"type": "token"
}
yield f"data: {json.dumps(token_data)}\n\n"
last_token_count = len(current_tokens)
# 短暂等待后继续检查
await asyncio.sleep(0.1)
# 如果超时,发送错误信息
if (time.time() - start_time) >= max_wait_time:
error_data = {"error": "请求超时", "type": "error"}
yield f"data: {json.dumps(error_data)}\n\n"
else:
# 发送完成标记
done_data = {"done": true, "type": "done"}
yield f"data: {json.dumps(done_data)}\n\n"
# 更新会话历史
current_message = input_text.split("当前问题:")[1].split(",")[
0] if "当前问题:" in input_text else input_text
session_history.append(current_message)
conversation_history[session_id] = session_history
# 等待代理任务完成(如果还未完成)
if not agent_task.done():
agent_task.cancel()
except exception as e:
error_data = {"error": f"流式处理错误: {str(e)}", "type": "error"}
yield f"data: {json.dumps(error_data)}\n\n"
@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):
"""清除指定会话的历史记录"""
if session_id in conversation_history:
del conversation_history[session_id]
return {"message": f"会话 {session_id} 的历史记录已清除"}
@app.get("/health")
async def health_check():
"""健康检查接口"""
return {
"status": "healthy",
"model_loaded": pandas_agent is not none,
"active_sessions": len(conversation_history)
}
@app.get("/stream_test")
async def stream_test():
"""测试流式接口"""
async def generate_test_data():
for i in range(10):
yield f"data: 测试消息 {i}\n\n"
await asyncio.sleep(1)
return streamingresponse(generate_test_data(), media_type="text/event-stream")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=9113) 以上就是基于fastapi与langchain开发excel智能数据分析api详解的详细内容,更多关于fastapi开发excel数据分析api的资料请关注代码网其它相关文章!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论