child-psycho-companion/xiaozhi_cli_client.py

250 lines
8.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
小智 AI CLI 客户端 - 简化版 MCP 工具处理
功能
- 连接 xiaozhi-server
- 声明 MCP 能力
- 发送文本对话
- 服务器可调用 MCP 工具psycho_screen
- 客户端将 MCP 调用转发给 psycho-screener stdio 工具
- 返回结果给服务器
用法
python xiaozhi_cli_client.py "今天小朋友打我,我好害怕"
"""
import asyncio
import json
import uuid
import sys
import argparse
import websockets
import subprocess
import os
DEVICE_ID = str(uuid.uuid4())
WS_URL = "ws://localhost:8000/xiaozhi/v1/"
MINIMAX_API_KEY = os.environ.get(
"MINIMAX_API_KEY",
"sk-cp-Sd2G0paJUZWdQhKrISIICVqQnuiE4qvT-yMszahI7s0Sau02Pa1XZCXNsj2Z91n-xNV8hIG-xL8lENaEgFNQBZr7S6Y8_R7OASOScenpJIxxWOb6vc7sF38"
)
# ============================================================================
# MCP 工具调用处理
# ============================================================================
def _mcp_call_sync(messages: list, include_prefix: bool) -> dict:
"""同步调用 MCP 工具(在独立线程中运行)"""
import select, threading
script_path = os.path.join(os.path.dirname(__file__), "src", "psycho_screener", "mcp_tool_native.py")
env = {**os.environ, "MINIMAX_API_KEY": MINIMAX_API_KEY}
proc = subprocess.Popen(
[sys.executable, script_path],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, env=env,
)
_id = [0]
def send(msg):
_id[0] += 1
msg["id"] = _id[0]
proc.stdin.write(json.dumps(msg) + "\n")
proc.stdin.flush()
if select.select([proc.stdout], [], [], 20)[0]:
return json.loads(proc.stdout.readline())
return None
try:
send({"jsonrpc": "2.0", "method": "initialize", "params": {
"protocolVersion": "2024-11-05", "capabilities": {},
"clientInfo": {"name": "xiaozhi-cli", "version": "1.0"}
}})
proc.stdin.write(json.dumps({"jsonrpc": "2.0", "method": "notifications/initialized"}) + "\n")
proc.stdin.flush()
r = send({"jsonrpc": "2.0", "method": "tools/call", "params": {
"name": "psycho_screen",
"arguments": {"messages": messages, "include_prefix": include_prefix}
}})
if r and "result" in r:
return json.loads(r["result"][0]["text"])
return {"error": "no result"}
finally:
proc.terminate()
proc.wait(timeout=3)
async def call_psycho_screener(messages: list, include_prefix: bool = True) -> dict:
"""异步调用(在独立线程池中运行同步 subprocess"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _mcp_call_sync, messages, include_prefix)
def build_mcp_response(req_id: int | str, result: dict) -> dict:
"""构建 MCP JSON-RPC 成功响应"""
return {
"type": "mcp",
"payload": {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}],
"isError": False,
}
}
}
# ============================================================================
# WebSocket 客户端
# ============================================================================
async def run_single(text: str):
print(f"\n{'='*60}")
print(f"设备ID: {DEVICE_ID}")
print(f"发送: {text}")
print(f"{'='*60}\n")
# Query param device-id kept for server-side routing/parsing;
# HTTP headers (Device-ID, Client-ID) are what the server's
# connection.py actually reads (line 227: self.headers.get("device-id")).
url = f"{WS_URL}?device-id={DEVICE_ID}"
async with websockets.connect(
url,
additional_headers={
"Device-ID": DEVICE_ID,
"Client-ID": DEVICE_ID,
},
ping_interval=None,
max_size=10 * 1024 * 1024,
) as ws:
# 1. 发 hello等 welcome含工具列表
await ws.send(json.dumps({
"type": "hello", "version": 1,
"features": {"mcp": True},
"transport": "websocket",
"audio_params": {"format": "opus", "sample_rate": 16000, "channels": 1, "frame_duration": 60},
}))
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
welcome = json.loads(raw)
print(f"[服务器] type={welcome.get('type')}, session={welcome.get('session_id','')[:16]}")
tools = [t.get("name") for t in welcome.get("tools", [])]
print(f"[工具列表] {tools}")
# 2. 发文本(用 listen 消息类型)
await ws.send(json.dumps({"type": "listen", "state": "detect", "text": text}))
print("[已发送文本]\n")
# 3. 循环收消息,处理 MCP 调用
replies = []
mcp_init_id = None
for i in range(30):
try:
raw = await asyncio.wait_for(ws.recv(), timeout=30.0)
try:
data = json.loads(raw)
except (json.JSONDecodeError, UnicodeDecodeError):
# 二进制音频数据,跳过
print(f"[二进制数据] (跳过 {len(raw)} bytes)")
continue
t = data.get("type")
if t == "mcp":
payload = data.get("payload", {})
method = payload.get("method")
req_id = payload.get("id")
params = payload.get("params", {})
if method == "initialize":
print(f"[MCP] initialize 请求")
resp = {
"type": "mcp",
"payload": {
"jsonrpc": "2.0", "id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "xiaozhi-cli", "version": "1.0"}
}
}
}
await ws.send(json.dumps(resp))
print("[MCP] 已回复 initialize")
elif method == "tools/call":
tool_name = params.get("name")
args = params.get("arguments", {})
print(f"[MCP] 调用工具: {tool_name}")
if tool_name == "psycho_screen":
# 调用我们的筛查器
result = await call_psycho_screener(
messages=args.get("messages", []),
include_prefix=args.get("include_prefix", True),
)
resp = build_mcp_response(req_id, result)
await ws.send(json.dumps(resp))
print(f"[MCP] 筛查结果: detected={result.get('detected')}, category={result.get('category')}")
if result.get("prefix"):
print(f"[MCP] 前缀: {result['prefix'][:80]}")
else:
print(f"[MCP] 未知工具: {tool_name}")
elif t == "text":
content = data.get("content", "")
replies.append(content)
print(f"[玩偶] {content[:200]}")
elif t == "llm":
print(f"[LLM] {str(data)[:150]}")
elif t == "tts":
print("[TTS] (音频跳过)")
elif t == "error":
print(f"[ERROR] {data}")
else:
print(f"[{t}] {str(data)[:100]}")
# 收到第二条文本后退出
if t == "text" and len(replies) >= 2:
break
except asyncio.TimeoutError:
print(f"[超时 {i+1}/30]")
if i > 3:
break
print(f"\n{'='*60}")
print(f"完成!收到 {len(replies)} 条玩偶回复")
for j, r in enumerate(replies):
print(f" 回复{j+1}: {r[:300]}")
print(f"{'='*60}\n")
return True
async def main():
parser = argparse.ArgumentParser(description="小智 AI CLI 客户端(含 MCP")
parser.add_argument("text", nargs="?", help="对话内容")
parser.add_argument("--interactive", "-i", action="store_true", help="交互模式")
args = parser.parse_args()
if args.interactive or not args.text:
print("交互模式暂不支持 MCP 工具处理")
else:
await run_single(args.text)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n已退出")
except Exception as e:
print(f"[错误] {e}")
sys.exit(1)