From 9b79ccd0a3975e05fea21b27290ce47df8206c4a Mon Sep 17 00:00:00 2001 From: Sileya Date: Sun, 5 Apr 2026 10:17:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=8E=9F=E7=94=9FMCP=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=20+=20xiaozhi=20CLI=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/psycho_screener/mcp_tool_native.py | 174 ++++++++++++++++++ test_xiaozhi_client.py | 65 +++++++ xiaozhi_cli_client.py | 238 +++++++++++++++++++++++++ 3 files changed, 477 insertions(+) create mode 100644 src/psycho_screener/mcp_tool_native.py create mode 100644 test_xiaozhi_client.py create mode 100644 xiaozhi_cli_client.py diff --git a/src/psycho_screener/mcp_tool_native.py b/src/psycho_screener/mcp_tool_native.py new file mode 100644 index 0000000..a280f6e --- /dev/null +++ b/src/psycho_screener/mcp_tool_native.py @@ -0,0 +1,174 @@ +""" +儿童心理陪伴 MCP 工具(原生 mcp 协议版) +使用官方 mcp Python SDK,不依赖 fastmcp + +适用于直接集成到 xiaozhi-esp32-server 的 Docker 容器环境中。 +""" + +from __future__ import annotations + +import os +import json +import logging +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import Tool, TextContent + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger("PsychoScreenerMCP") + +# ============================================================================ +# 导入筛查器 +# ============================================================================ + +import sys +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from psycho_screener.screener import PsychoScreener, ScreeningResult + + +# ============================================================================ +# MCP Server +# ============================================================================ + +APP_NAME = "psycho-screener" +APP_VERSION = "1.0.0" + +server = Server(APP_NAME) + + +@server.list_tools() +async def list_tools() -> list[Tool]: + """列出所有可用工具""" + return [ + Tool( + name="psycho_screen", + description="对儿童对话进行心理问题筛查。当儿童与玩偶对话中可能存在霸凌、抑郁、焦虑、家庭矛盾等心理问题时,调用此工具进行分析。", + inputSchema={ + "type": "object", + "properties": { + "messages": { + "type": "array", + "description": "儿童与玩偶的完整对话上下文,格式为消息数组。每条消息包含 role(system/user/assistant)和 content(内容)。", + "items": { + "type": "object", + "properties": { + "role": {"type": "string"}, + "content": {"type": "string"} + }, + "required": ["role", "content"] + } + }, + "include_prefix": { + "type": "boolean", + "default": True, + "description": "检测到问题时是否返回注入前缀" + } + }, + "required": ["messages"] + } + ) + ] + + +@server.call_tool() +async def call_tool(name: str, arguments: dict) -> list[TextContent]: + """执行工具调用""" + if name != "psycho_screen": + return [TextContent(type="text", text=json.dumps({"error": f"Unknown tool: {name}"}))] + + messages = arguments.get("messages", []) + include_prefix = arguments.get("include_prefix", True) + + api_key = os.environ.get("MINIMAX_API_KEY", "") + if not api_key: + result = { + "detected": False, + "category": "none", + "severity": "none", + "summary": "API key 未配置", + "suggestion": "", + "prefix": "", + "error": "MINIMAX_API_KEY not set" + } + return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))] + + try: + # 提取孩子消息 + child_messages = [ + msg["content"] + for msg in messages + if msg.get("role") == "user" and msg.get("content") + ] + context = "\n".join(child_messages) + + if not context.strip(): + result = { + "detected": False, + "category": "none", + "severity": "none", + "summary": "无儿童对话内容可分析", + "suggestion": "", + "prefix": "" + } + return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))] + + # 调用筛查器 + screener = PsychoScreener(api_key=api_key) + screening_result = screener.screen(context) + + # 构建结果 + result: dict = { + "detected": screening_result.detected, + "category": screening_result.category, + "severity": "none" if not screening_result.detected else screening_result.severity, + "summary": screening_result.summary, + "suggestion": screening_result.suggestion if screening_result.detected else "", + } + + if include_prefix and screening_result.detected: + result["prefix"] = screener.build_response_prefix(screening_result) + + logger.info( + f"psycho_screen: detected={screening_result.detected}, " + f"category={screening_result.category}, severity={screening_result.severity}" + ) + + return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))] + + except Exception as e: + logger.exception(f"psycho_screen error: {e}") + result = { + "detected": False, + "category": "none", + "severity": "none", + "summary": f"筛查过程出错: {str(e)}", + "suggestion": "", + "prefix": "", + "error": str(e) + } + return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))] + + +# ============================================================================ +# 主入口 +# ============================================================================ + +async def main(): + """启动 MCP stdio 服务器""" + logger.info(f"启动 PsychoScreener MCP 服务器 v{APP_VERSION}") + async with stdio_server() as (read_stream, write_stream): + await server.run( + read_stream, + write_stream, + server.create_initialization_options() + ) + + +if __name__ == "__main__": + import asyncio + asyncio.run(main()) diff --git a/test_xiaozhi_client.py b/test_xiaozhi_client.py new file mode 100644 index 0000000..fdbc022 --- /dev/null +++ b/test_xiaozhi_client.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +""" +小智设备连接测试客户端 +连接到 xiaozhi-server,模拟设备握手,触发 MCP 工具初始化 +""" + +import asyncio +import json +import uuid +import websockets +import sys + +XIAOZHI_WS = "ws://localhost:8000/xiaozhi/v1/" +DEVICE_ID = str(uuid.uuid4()) +TOPIC = str(uuid.uuid4()) + + +async def main(): + print(f"连接 xiaozhi-server: {XIAOZHI_WS}") + print(f"Device ID: {DEVICE_ID}") + + async with websockets.connect(XIAOZHI_WS, ping_interval=None) as ws: + # 发送设备 hello + hello = { + "type": "hello", + "deviceId": DEVICE_ID, + "deviceName": "测试设备", + "clientType": "py-websocket-test", + "protocolVersion": "1.0.0", + "featureSet": {"mcp": True}, + } + await ws.send(json.dumps(hello)) + print(f"发送 hello: {hello}") + + # 接收消息(最多等 30 秒) + for i in range(20): + try: + msg = await asyncio.wait_for(ws.recv(), timeout=5.0) + data = json.loads(msg) + msg_type = data.get("type", "unknown") + print(f"收到 [{msg_type}]: {json.dumps(data, ensure_ascii=False)[:200]}") + + if msg_type == "welcome": + print("\n✅ 连接成功!") + if "tools" in data: + print(f"MCP 工具列表: {[t.get('name') for t in data.get('tools', [])]}") + if "mcpTools" in data: + print(f"MCP 工具: {data.get('mcpTools')}") + break + except asyncio.TimeoutError: + print(f" ... 等待中 ({i+1}/20)") + continue + + print("\n测试完成,5秒后关闭...") + await asyncio.sleep(5) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n已退出") + except Exception as e: + print(f"错误: {e}") + sys.exit(1) diff --git a/xiaozhi_cli_client.py b/xiaozhi_cli_client.py new file mode 100644 index 0000000..c7d7298 --- /dev/null +++ b/xiaozhi_cli_client.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +""" +小智 AI CLI 客户端 - 简化版(含 MCP 工具处理) + +功能: + - 连接 xiaozhi-server + - 声明 MCP 能力 + - 发送文本对话 + - 服务器可调用 MCP 工具(psycho_screen) + - 客户端将 MCP 调用转发给 psycho-screener stdio 工具 + - 返回结果给服务器 + +用法: + python xiaozhi_cli_client.py "今天小朋友打我,我好害怕" +""" + +import asyncio +import json +import uuid +import sys +import argparse +import websockets +import subprocess +import os + +DEVICE_ID = str(uuid.uuid4()) +WS_URL = "ws://localhost:8000/xiaozhi/v1/" +MINIMAX_API_KEY = os.environ.get( + "MINIMAX_API_KEY", + "sk-cp-Sd2G0paJUZWdQhKrISIICVqQnuiE4qvT-yMszahI7s0Sau02Pa1XZCXNsj2Z91n-xNV8hIG-xL8lENaEgFNQBZr7S6Y8_R7OASOScenpJIxxWOb6vc7sF38" +) + + +# ============================================================================ +# MCP 工具调用处理 +# ============================================================================ + +def _mcp_call_sync(messages: list, include_prefix: bool) -> dict: + """同步调用 MCP 工具(在独立线程中运行)""" + import select, threading + + script_path = os.path.join(os.path.dirname(__file__), "src", "psycho_screener", "mcp_tool_native.py") + env = {**os.environ, "MINIMAX_API_KEY": MINIMAX_API_KEY} + proc = subprocess.Popen( + [sys.executable, script_path], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True, env=env, + ) + + _id = [0] + def send(msg): + _id[0] += 1 + msg["id"] = _id[0] + proc.stdin.write(json.dumps(msg) + "\n") + proc.stdin.flush() + if select.select([proc.stdout], [], [], 20)[0]: + return json.loads(proc.stdout.readline()) + return None + + try: + send({"jsonrpc": "2.0", "method": "initialize", "params": { + "protocolVersion": "2024-11-05", "capabilities": {}, + "clientInfo": {"name": "xiaozhi-cli", "version": "1.0"} + }}) + proc.stdin.write(json.dumps({"jsonrpc": "2.0", "method": "notifications/initialized"}) + "\n") + proc.stdin.flush() + + r = send({"jsonrpc": "2.0", "method": "tools/call", "params": { + "name": "psycho_screen", + "arguments": {"messages": messages, "include_prefix": include_prefix} + }}) + if r and "result" in r: + return json.loads(r["result"][0]["text"]) + return {"error": "no result"} + finally: + proc.terminate() + proc.wait(timeout=3) + + +async def call_psycho_screener(messages: list, include_prefix: bool = True) -> dict: + """异步调用(在独立线程池中运行同步 subprocess)""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, _mcp_call_sync, messages, include_prefix) + + +def build_mcp_response(req_id: int | str, result: dict) -> dict: + """构建 MCP JSON-RPC 成功响应""" + return { + "type": "mcp", + "payload": { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}], + "isError": False, + } + } + } + + +# ============================================================================ +# WebSocket 客户端 +# ============================================================================ + +async def run_single(text: str): + print(f"\n{'='*60}") + print(f"设备ID: {DEVICE_ID}") + print(f"发送: {text}") + print(f"{'='*60}\n") + + url = f"{WS_URL}?device-id={DEVICE_ID}&authorization=cli" + async with websockets.connect(url, ping_interval=None, max_size=10 * 1024 * 1024) as ws: + # 1. 发 hello,等 welcome(含工具列表) + await ws.send(json.dumps({ + "type": "hello", "version": 1, + "features": {"mcp": True}, + "transport": "websocket", + "audio_params": {"format": "opus", "sample_rate": 16000, "channels": 1, "frame_duration": 60}, + })) + raw = await asyncio.wait_for(ws.recv(), timeout=10.0) + welcome = json.loads(raw) + print(f"[服务器] type={welcome.get('type')}, session={welcome.get('session_id','')[:16]}") + tools = [t.get("name") for t in welcome.get("tools", [])] + print(f"[工具列表] {tools}") + + # 2. 发文本(用 listen 消息类型) + await ws.send(json.dumps({"type": "listen", "state": "detect", "text": text})) + print("[已发送文本]\n") + + # 3. 循环收消息,处理 MCP 调用 + replies = [] + mcp_init_id = None + + for i in range(30): + try: + raw = await asyncio.wait_for(ws.recv(), timeout=30.0) + try: + data = json.loads(raw) + except (json.JSONDecodeError, UnicodeDecodeError): + # 二进制音频数据,跳过 + print(f"[二进制数据] (跳过 {len(raw)} bytes)") + continue + t = data.get("type") + + if t == "mcp": + payload = data.get("payload", {}) + method = payload.get("method") + req_id = payload.get("id") + params = payload.get("params", {}) + + if method == "initialize": + print(f"[MCP] initialize 请求") + resp = { + "type": "mcp", + "payload": { + "jsonrpc": "2.0", "id": req_id, + "result": { + "protocolVersion": "2024-11-05", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "xiaozhi-cli", "version": "1.0"} + } + } + } + await ws.send(json.dumps(resp)) + print("[MCP] 已回复 initialize") + + elif method == "tools/call": + tool_name = params.get("name") + args = params.get("arguments", {}) + print(f"[MCP] 调用工具: {tool_name}") + + if tool_name == "psycho_screen": + # 调用我们的筛查器 + result = await call_psycho_screener( + messages=args.get("messages", []), + include_prefix=args.get("include_prefix", True), + ) + resp = build_mcp_response(req_id, result) + await ws.send(json.dumps(resp)) + print(f"[MCP] 筛查结果: detected={result.get('detected')}, category={result.get('category')}") + if result.get("prefix"): + print(f"[MCP] 前缀: {result['prefix'][:80]}") + else: + print(f"[MCP] 未知工具: {tool_name}") + + elif t == "text": + content = data.get("content", "") + replies.append(content) + print(f"[玩偶] {content[:200]}") + + elif t == "llm": + print(f"[LLM] {str(data)[:150]}") + + elif t == "tts": + print("[TTS] (音频跳过)") + + elif t == "error": + print(f"[ERROR] {data}") + + else: + print(f"[{t}] {str(data)[:100]}") + + # 收到第二条文本后退出 + if t == "text" and len(replies) >= 2: + break + + except asyncio.TimeoutError: + print(f"[超时 {i+1}/30]") + if i > 3: + break + + print(f"\n{'='*60}") + print(f"完成!收到 {len(replies)} 条玩偶回复") + for j, r in enumerate(replies): + print(f" 回复{j+1}: {r[:300]}") + print(f"{'='*60}\n") + return True + + +async def main(): + parser = argparse.ArgumentParser(description="小智 AI CLI 客户端(含 MCP)") + parser.add_argument("text", nargs="?", help="对话内容") + parser.add_argument("--interactive", "-i", action="store_true", help="交互模式") + args = parser.parse_args() + if args.interactive or not args.text: + print("交互模式暂不支持 MCP 工具处理") + else: + await run_single(args.text) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n已退出") + except Exception as e: + print(f"[错误] {e}") + sys.exit(1)