3人参与 • 2026-03-20 • Python
调用 deepseek api,并通过 mcp (model context protocol) 协议接入 mcp-clickhouse,从而让 deepseek 能够查询 clickhouse 数据库来回答用户问题。
在运行代码之前,你需要确保环境中有以下依赖:
安装 python 库:
pip install mcp openai
安装 uv (因为你的配置中使用了 uv 来运行 mcp-clickhouse):
pip install uv
获取 deepseek api key:你需要一个有效的 deepseek api 密钥。
请将代码中的 <your_deepseek_api_key> 替换为你的实际 key,并确保 <clickhouse-host> 等配置已填入正确的值。
import asyncio
import os
import json
import sys
from typing import list, dict, any, optional
# 导入 mcp 相关库
from mcp import clientsession, stdioserverparameters
from mcp.client.stdio import stdio_client
import mcp.types as types
# 导入 openai sdk (deepseek 兼容 openai 格式)
from openai import asyncopenai
# ================= 配置部分 =================
# deepseek api 配置
deepseek_api_key = os.getenv("deepseek_api_key", "<your_deepseek_api_key>")
deepseek_base_url = "https://api.deepseek.com"
model_name = "deepseek-chat" # 或者 "deepseek-reasoner" (r1)
# mcp clickhouse server 配置 (来自你的 json)
mcp_clickhouse_config = {
"command": "uv",
"args": [
"run",
"--with",
"mcp-clickhouse",
"--python",
"3.10",
"mcp-clickhouse"
],
"env": {
"clickhouse_host": "<clickhouse-host>",
"clickhouse_port": "<clickhouse-port>",
"clickhouse_user": "<clickhouse-user>",
"clickhouse_password": "<clickhouse-password>",
"clickhouse_role": "<clickhouse-role>",
"clickhouse_secure": "true",
"clickhouse_verify": "true",
"clickhouse_connect_timeout": "30",
"clickhouse_send_receive_timeout": "30"
}
}
# ================= 工具函数 =================
def convert_mcp_tool_to_openai(mcp_tool: types.tool) -> dict[str, any]:
"""
将 mcp 工具定义转换为 openai/deepseek 支持的 function calling 格式。
"""
return {
"type": "function",
"function": {
"name": mcp_tool.name,
"description": mcp_tool.description,
"parameters": mcp_tool.inputschema
}
}
# ================= 主逻辑 =================
async def run_chat_loop():
# 1. 初始化 deepseek 客户端
client = asyncopenai(api_key=deepseek_api_key, base_url=deepseek_base_url)
# 2. 准备 mcp server 参数
server_params = stdioserverparameters(
command=mcp_clickhouse_config["command"],
args=mcp_clickhouse_config["args"],
env={**os.environ, **mcp_clickhouse_config["env"]} # 合并当前环境变量
)
print(f"正在启动 mcp server: {mcp_clickhouse_config['command']}...")
# 3. 连接 mcp server 并开始对话循环
async with stdio_client(server_params) as (read, write):
async with clientsession(read, write) as session:
# 初始化连接
await session.initialize()
# 获取 mcp server 提供的工具列表
mcp_tools_list = await session.list_tools()
openai_tools = [convert_mcp_tool_to_openai(tool) for tool in mcp_tools_list.tools]
print(f"\n已连接 mcp server,加载了 {len(openai_tools)} 个工具: {[t['function']['name'] for t in openai_tools]}")
print("-" * 50)
print("你可以开始询问关于 clickhouse 的问题了 (输入 'quit' 退出)。")
messages = [
{"role": "system", "content": "你是一个智能助手,可以利用工具查询 clickhouse 数据库来回答用户问题。"}
]
while true:
user_input = input("\n用户: ")
if user_input.lower() in ["quit", "exit"]:
break
messages.append({"role": "user", "content": user_input})
# 第一轮调用:发送用户问题 + 工具定义给 deepseek
try:
response = await client.chat.completions.create(
model=model_name,
messages=messages,
tools=openai_tools,
)
except exception as e:
print(f"api 调用错误: {e}")
continue
assistant_msg = response.choices[0].message
messages.append(assistant_msg)
# 检查 deepseek 是否想调用工具
if assistant_msg.tool_calls:
print(f"\n[思考] 模型决定调用工具...")
for tool_call in assistant_msg.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
print(f" -> 调用工具: {tool_name}, 参数: {tool_args}")
# 执行 mcp 工具
try:
mcp_result = await session.call_tool(tool_name, arguments=tool_args)
# 获取文本结果 (clickhouse mcp 通常返回文本或 json 文本)
tool_output = ""
if mcp_result.content:
for content in mcp_result.content:
if content.type == "text":
tool_output += content.text
print(f" <- 工具返回结果 (前100字符): {tool_output[:100]}...")
# 将工具结果添加回对话历史
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_output
})
except exception as e:
error_msg = f"工具执行出错: {str(e)}"
print(f" ! {error_msg}")
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": error_msg
})
# 第二轮调用:将工具结果发送回 deepseek 获取最终回答
final_response = await client.chat.completions.create(
model=model_name,
messages=messages,
# 这里依然传入 tools,以防模型需要继续多步调用
tools=openai_tools,
)
final_content = final_response.choices[0].message.content
print(f"\ndeepseek: {final_content}")
messages.append(final_response.choices[0].message)
else:
# 如果不需要调用工具,直接输出结果
print(f"\ndeepseek: {assistant_msg.content}")
if __name__ == "__main__":
# windows 下通常需要这个策略
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.windowsselectoreventlooppolicy())
asyncio.run(run_chat_loop())
mcp 连接 (stdio_client):
uv run ... 命令启动 mcp-clickhouse 的子进程。clickhouse_host 等敏感信息通过环境变量 (env) 传递给子进程,保证安全性。工具发现与转换 (convert_mcp_tool_to_openai):
session.list_tools() 从 mcp-clickhouse 获取可用的工具(例如执行 sql 查询的工具)。对话循环 (chat loop):
tool_calls(例如它生成了一个 sql 语句来查询数据),python 代码会拦截这个请求,通过 session.call_tool 在本地的 mcp-clickhouse 服务中执行该 sql。tool 类型的消息发回给 deepseek。假设你问:“查询 users 表里有多少行数据?”
tool_calls,调用 query_sql 工具,参数为 select count() from users。105。105 发给 deepseek。到此这篇关于python调用deepseek api查询clickhouse的流程步骤的文章就介绍到这了,更多相关python deepseek api查询clickhouse内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论