#!/usr/bin/env python3 """ 小智 AI CLI 客户端 - 简化版(含 MCP 工具处理) 功能: - 连接 xiaozhi-server - 声明 MCP 能力 - 发送文本对话 - 服务器可调用 MCP 工具(psycho_screen) - 客户端将 MCP 调用转发给 psycho-screener stdio 工具 - 返回结果给服务器 用法: python xiaozhi_cli_client.py "今天小朋友打我,我好害怕" """ import asyncio import json import uuid import sys import argparse import websockets import subprocess import os DEVICE_ID = str(uuid.uuid4()) WS_URL = "ws://localhost:8000/xiaozhi/v1/" MINIMAX_API_KEY = os.environ.get( "MINIMAX_API_KEY", "sk-cp-Sd2G0paJUZWdQhKrISIICVqQnuiE4qvT-yMszahI7s0Sau02Pa1XZCXNsj2Z91n-xNV8hIG-xL8lENaEgFNQBZr7S6Y8_R7OASOScenpJIxxWOb6vc7sF38" ) # ============================================================================ # MCP 工具调用处理 # ============================================================================ def _mcp_call_sync(messages: list, include_prefix: bool) -> dict: """同步调用 MCP 工具(在独立线程中运行)""" import select, threading script_path = os.path.join(os.path.dirname(__file__), "src", "psycho_screener", "mcp_tool_native.py") env = {**os.environ, "MINIMAX_API_KEY": MINIMAX_API_KEY} proc = subprocess.Popen( [sys.executable, script_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env, ) _id = [0] def send(msg): _id[0] += 1 msg["id"] = _id[0] proc.stdin.write(json.dumps(msg) + "\n") proc.stdin.flush() if select.select([proc.stdout], [], [], 20)[0]: return json.loads(proc.stdout.readline()) return None try: send({"jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "xiaozhi-cli", "version": "1.0"} }}) proc.stdin.write(json.dumps({"jsonrpc": "2.0", "method": "notifications/initialized"}) + "\n") proc.stdin.flush() r = send({"jsonrpc": "2.0", "method": "tools/call", "params": { "name": "psycho_screen", "arguments": {"messages": messages, "include_prefix": include_prefix} }}) if r and "result" in r: return json.loads(r["result"][0]["text"]) return {"error": "no result"} finally: proc.terminate() proc.wait(timeout=3) async def call_psycho_screener(messages: list, include_prefix: bool = True) -> dict: """异步调用(在独立线程池中运行同步 subprocess)""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _mcp_call_sync, messages, include_prefix) def build_mcp_response(req_id: int | str, result: dict) -> dict: """构建 MCP JSON-RPC 成功响应""" return { "type": "mcp", "payload": { "jsonrpc": "2.0", "id": req_id, "result": { "content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}], "isError": False, } } } # ============================================================================ # WebSocket 客户端 # ============================================================================ async def run_single(text: str): print(f"\n{'='*60}") print(f"设备ID: {DEVICE_ID}") print(f"发送: {text}") print(f"{'='*60}\n") url = f"{WS_URL}?device-id={DEVICE_ID}&authorization=cli" async with websockets.connect(url, ping_interval=None, max_size=10 * 1024 * 1024) as ws: # 1. 发 hello,等 welcome(含工具列表) await ws.send(json.dumps({ "type": "hello", "version": 1, "features": {"mcp": True}, "transport": "websocket", "audio_params": {"format": "opus", "sample_rate": 16000, "channels": 1, "frame_duration": 60}, })) raw = await asyncio.wait_for(ws.recv(), timeout=10.0) welcome = json.loads(raw) print(f"[服务器] type={welcome.get('type')}, session={welcome.get('session_id','')[:16]}") tools = [t.get("name") for t in welcome.get("tools", [])] print(f"[工具列表] {tools}") # 2. 发文本(用 listen 消息类型) await ws.send(json.dumps({"type": "listen", "state": "detect", "text": text})) print("[已发送文本]\n") # 3. 循环收消息,处理 MCP 调用 replies = [] mcp_init_id = None for i in range(30): try: raw = await asyncio.wait_for(ws.recv(), timeout=30.0) try: data = json.loads(raw) except (json.JSONDecodeError, UnicodeDecodeError): # 二进制音频数据,跳过 print(f"[二进制数据] (跳过 {len(raw)} bytes)") continue t = data.get("type") if t == "mcp": payload = data.get("payload", {}) method = payload.get("method") req_id = payload.get("id") params = payload.get("params", {}) if method == "initialize": print(f"[MCP] initialize 请求") resp = { "type": "mcp", "payload": { "jsonrpc": "2.0", "id": req_id, "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "xiaozhi-cli", "version": "1.0"} } } } await ws.send(json.dumps(resp)) print("[MCP] 已回复 initialize") elif method == "tools/call": tool_name = params.get("name") args = params.get("arguments", {}) print(f"[MCP] 调用工具: {tool_name}") if tool_name == "psycho_screen": # 调用我们的筛查器 result = await call_psycho_screener( messages=args.get("messages", []), include_prefix=args.get("include_prefix", True), ) resp = build_mcp_response(req_id, result) await ws.send(json.dumps(resp)) print(f"[MCP] 筛查结果: detected={result.get('detected')}, category={result.get('category')}") if result.get("prefix"): print(f"[MCP] 前缀: {result['prefix'][:80]}") else: print(f"[MCP] 未知工具: {tool_name}") elif t == "text": content = data.get("content", "") replies.append(content) print(f"[玩偶] {content[:200]}") elif t == "llm": print(f"[LLM] {str(data)[:150]}") elif t == "tts": print("[TTS] (音频跳过)") elif t == "error": print(f"[ERROR] {data}") else: print(f"[{t}] {str(data)[:100]}") # 收到第二条文本后退出 if t == "text" and len(replies) >= 2: break except asyncio.TimeoutError: print(f"[超时 {i+1}/30]") if i > 3: break print(f"\n{'='*60}") print(f"完成!收到 {len(replies)} 条玩偶回复") for j, r in enumerate(replies): print(f" 回复{j+1}: {r[:300]}") print(f"{'='*60}\n") return True async def main(): parser = argparse.ArgumentParser(description="小智 AI CLI 客户端(含 MCP)") parser.add_argument("text", nargs="?", help="对话内容") parser.add_argument("--interactive", "-i", action="store_true", help="交互模式") args = parser.parse_args() if args.interactive or not args.text: print("交互模式暂不支持 MCP 工具处理") else: await run_single(args.text) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n已退出") except Exception as e: print(f"[错误] {e}") sys.exit(1)