it编程 > 前端脚本 > Python

Python调用DeepSeek API查询ClickHouse的流程步骤

3人参与 2026-03-20 Python

调用 deepseek api,并通过 mcp (model context protocol) 协议接入 mcp-clickhouse,从而让 deepseek 能够查询 clickhouse 数据库来回答用户问题。

前置准备

在运行代码之前,你需要确保环境中有以下依赖:

安装 python 库

pip install mcp openai

安装 uv (因为你的配置中使用了 uv 来运行 mcp-clickhouse):

pip install uv

获取 deepseek api key:你需要一个有效的 deepseek api 密钥。

python 代码 (main.py)

请将代码中的 <your_deepseek_api_key> 替换为你的实际 key,并确保 <clickhouse-host> 等配置已填入正确的值。

import asyncio
import os
import json
import sys
from typing import list, dict, any, optional

# 导入 mcp 相关库
from mcp import clientsession, stdioserverparameters
from mcp.client.stdio import stdio_client
import mcp.types as types

# 导入 openai sdk (deepseek 兼容 openai 格式)
from openai import asyncopenai

# ================= 配置部分 =================

# deepseek api 配置
deepseek_api_key = os.getenv("deepseek_api_key", "<your_deepseek_api_key>")
deepseek_base_url = "https://api.deepseek.com"
model_name = "deepseek-chat"  # 或者 "deepseek-reasoner" (r1)

# mcp clickhouse server 配置 (来自你的 json)
mcp_clickhouse_config = {
    "command": "uv",
    "args": [
        "run",
        "--with",
        "mcp-clickhouse",
        "--python",
        "3.10",
        "mcp-clickhouse"
    ],
    "env": {
        "clickhouse_host": "<clickhouse-host>",
        "clickhouse_port": "<clickhouse-port>",
        "clickhouse_user": "<clickhouse-user>",
        "clickhouse_password": "<clickhouse-password>",
        "clickhouse_role": "<clickhouse-role>",
        "clickhouse_secure": "true",
        "clickhouse_verify": "true",
        "clickhouse_connect_timeout": "30",
        "clickhouse_send_receive_timeout": "30"
    }
}

# ================= 工具函数 =================

def convert_mcp_tool_to_openai(mcp_tool: types.tool) -> dict[str, any]:
    """
    将 mcp 工具定义转换为 openai/deepseek 支持的 function calling 格式。
    """
    return {
        "type": "function",
        "function": {
            "name": mcp_tool.name,
            "description": mcp_tool.description,
            "parameters": mcp_tool.inputschema
        }
    }

# ================= 主逻辑 =================

async def run_chat_loop():
    # 1. 初始化 deepseek 客户端
    client = asyncopenai(api_key=deepseek_api_key, base_url=deepseek_base_url)
    
    # 2. 准备 mcp server 参数
    server_params = stdioserverparameters(
        command=mcp_clickhouse_config["command"],
        args=mcp_clickhouse_config["args"],
        env={**os.environ, **mcp_clickhouse_config["env"]} # 合并当前环境变量
    )

    print(f"正在启动 mcp server: {mcp_clickhouse_config['command']}...")
    
    # 3. 连接 mcp server 并开始对话循环
    async with stdio_client(server_params) as (read, write):
        async with clientsession(read, write) as session:
            # 初始化连接
            await session.initialize()
            
            # 获取 mcp server 提供的工具列表
            mcp_tools_list = await session.list_tools()
            openai_tools = [convert_mcp_tool_to_openai(tool) for tool in mcp_tools_list.tools]
            
            print(f"\n已连接 mcp server,加载了 {len(openai_tools)} 个工具: {[t['function']['name'] for t in openai_tools]}")
            print("-" * 50)
            print("你可以开始询问关于 clickhouse 的问题了 (输入 'quit' 退出)。")

            messages = [
                {"role": "system", "content": "你是一个智能助手,可以利用工具查询 clickhouse 数据库来回答用户问题。"}
            ]

            while true:
                user_input = input("\n用户: ")
                if user_input.lower() in ["quit", "exit"]:
                    break

                messages.append({"role": "user", "content": user_input})

                # 第一轮调用:发送用户问题 + 工具定义给 deepseek
                try:
                    response = await client.chat.completions.create(
                        model=model_name,
                        messages=messages,
                        tools=openai_tools,
                    )
                except exception as e:
                    print(f"api 调用错误: {e}")
                    continue

                assistant_msg = response.choices[0].message
                messages.append(assistant_msg)

                # 检查 deepseek 是否想调用工具
                if assistant_msg.tool_calls:
                    print(f"\n[思考] 模型决定调用工具...")
                    
                    for tool_call in assistant_msg.tool_calls:
                        tool_name = tool_call.function.name
                        tool_args = json.loads(tool_call.function.arguments)
                        
                        print(f"  -> 调用工具: {tool_name}, 参数: {tool_args}")

                        # 执行 mcp 工具
                        try:
                            mcp_result = await session.call_tool(tool_name, arguments=tool_args)
                            
                            # 获取文本结果 (clickhouse mcp 通常返回文本或 json 文本)
                            tool_output = ""
                            if mcp_result.content:
                                for content in mcp_result.content:
                                    if content.type == "text":
                                        tool_output += content.text
                            
                            print(f"  <- 工具返回结果 (前100字符): {tool_output[:100]}...")

                            # 将工具结果添加回对话历史
                            messages.append({
                                "role": "tool",
                                "tool_call_id": tool_call.id,
                                "content": tool_output
                            })

                        except exception as e:
                            error_msg = f"工具执行出错: {str(e)}"
                            print(f"  ! {error_msg}")
                            messages.append({
                                "role": "tool",
                                "tool_call_id": tool_call.id,
                                "content": error_msg
                            })

                    # 第二轮调用:将工具结果发送回 deepseek 获取最终回答
                    final_response = await client.chat.completions.create(
                        model=model_name,
                        messages=messages,
                        # 这里依然传入 tools,以防模型需要继续多步调用
                        tools=openai_tools, 
                    )
                    
                    final_content = final_response.choices[0].message.content
                    print(f"\ndeepseek: {final_content}")
                    messages.append(final_response.choices[0].message)
                
                else:
                    # 如果不需要调用工具,直接输出结果
                    print(f"\ndeepseek: {assistant_msg.content}")

if __name__ == "__main__":
    # windows 下通常需要这个策略
    if sys.platform.startswith('win'):
        asyncio.set_event_loop_policy(asyncio.windowsselectoreventlooppolicy())
        
    asyncio.run(run_chat_loop())

代码逻辑解释

mcp 连接 (stdio_client):

工具发现与转换 (convert_mcp_tool_to_openai):

对话循环 (chat loop):

运行示例

假设你问:“查询 users 表里有多少行数据?”

  1. deepseek 分析意图,返回 tool_calls,调用 query_sql 工具,参数为 select count() from users
  2. python 脚本通过 mcp 执行该 sql。
  3. clickhouse 返回 105
  4. python 脚本将 105 发给 deepseek。
  5. deepseek 回答:“users 表里共有 105 行数据。”

到此这篇关于python调用deepseek api查询clickhouse的流程步骤的文章就介绍到这了,更多相关python deepseek api查询clickhouse内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

您想发表意见!!点此发布评论

推荐阅读

Python可变与非可变数据类型示例详解

03-20

Python批量实现PDF转换为图片(JPG/PNG)

03-20

Python调用Deepseek API的四种常见错误及解决方法

03-20

Python代码实现PDF与Word之间互转

03-20

Python JSON库json、simdjson与orjson深度对比

03-20

python判断图片相似度找出相似的图像问题

03-20

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论