child-psycho-companion/xiaozhi_cli_client.py

239 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
小智 AI CLI 客户端 - 简化版(含 MCP 工具处理)
功能:
- 连接 xiaozhi-server
- 声明 MCP 能力
- 发送文本对话
- 服务器可调用 MCP 工具psycho_screen
- 客户端将 MCP 调用转发给 psycho-screener stdio 工具
- 返回结果给服务器
用法:
python xiaozhi_cli_client.py "今天小朋友打我,我好害怕"
"""
import asyncio
import json
import uuid
import sys
import argparse
import websockets
import subprocess
import os
DEVICE_ID = str(uuid.uuid4())
WS_URL = "ws://localhost:8000/xiaozhi/v1/"
MINIMAX_API_KEY = os.environ.get(
"MINIMAX_API_KEY",
"sk-cp-Sd2G0paJUZWdQhKrISIICVqQnuiE4qvT-yMszahI7s0Sau02Pa1XZCXNsj2Z91n-xNV8hIG-xL8lENaEgFNQBZr7S6Y8_R7OASOScenpJIxxWOb6vc7sF38"
)
# ============================================================================
# MCP 工具调用处理
# ============================================================================
def _mcp_call_sync(messages: list, include_prefix: bool) -> dict:
"""同步调用 MCP 工具(在独立线程中运行)"""
import select, threading
script_path = os.path.join(os.path.dirname(__file__), "src", "psycho_screener", "mcp_tool_native.py")
env = {**os.environ, "MINIMAX_API_KEY": MINIMAX_API_KEY}
proc = subprocess.Popen(
[sys.executable, script_path],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, env=env,
)
_id = [0]
def send(msg):
_id[0] += 1
msg["id"] = _id[0]
proc.stdin.write(json.dumps(msg) + "\n")
proc.stdin.flush()
if select.select([proc.stdout], [], [], 20)[0]:
return json.loads(proc.stdout.readline())
return None
try:
send({"jsonrpc": "2.0", "method": "initialize", "params": {
"protocolVersion": "2024-11-05", "capabilities": {},
"clientInfo": {"name": "xiaozhi-cli", "version": "1.0"}
}})
proc.stdin.write(json.dumps({"jsonrpc": "2.0", "method": "notifications/initialized"}) + "\n")
proc.stdin.flush()
r = send({"jsonrpc": "2.0", "method": "tools/call", "params": {
"name": "psycho_screen",
"arguments": {"messages": messages, "include_prefix": include_prefix}
}})
if r and "result" in r:
return json.loads(r["result"][0]["text"])
return {"error": "no result"}
finally:
proc.terminate()
proc.wait(timeout=3)
async def call_psycho_screener(messages: list, include_prefix: bool = True) -> dict:
"""异步调用(在独立线程池中运行同步 subprocess"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _mcp_call_sync, messages, include_prefix)
def build_mcp_response(req_id: int | str, result: dict) -> dict:
"""构建 MCP JSON-RPC 成功响应"""
return {
"type": "mcp",
"payload": {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}],
"isError": False,
}
}
}
# ============================================================================
# WebSocket 客户端
# ============================================================================
async def run_single(text: str):
print(f"\n{'='*60}")
print(f"设备ID: {DEVICE_ID}")
print(f"发送: {text}")
print(f"{'='*60}\n")
url = f"{WS_URL}?device-id={DEVICE_ID}&authorization=cli"
async with websockets.connect(url, ping_interval=None, max_size=10 * 1024 * 1024) as ws:
# 1. 发 hello等 welcome含工具列表
await ws.send(json.dumps({
"type": "hello", "version": 1,
"features": {"mcp": True},
"transport": "websocket",
"audio_params": {"format": "opus", "sample_rate": 16000, "channels": 1, "frame_duration": 60},
}))
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
welcome = json.loads(raw)
print(f"[服务器] type={welcome.get('type')}, session={welcome.get('session_id','')[:16]}")
tools = [t.get("name") for t in welcome.get("tools", [])]
print(f"[工具列表] {tools}")
# 2. 发文本(用 listen 消息类型)
await ws.send(json.dumps({"type": "listen", "state": "detect", "text": text}))
print("[已发送文本]\n")
# 3. 循环收消息,处理 MCP 调用
replies = []
mcp_init_id = None
for i in range(30):
try:
raw = await asyncio.wait_for(ws.recv(), timeout=30.0)
try:
data = json.loads(raw)
except (json.JSONDecodeError, UnicodeDecodeError):
# 二进制音频数据,跳过
print(f"[二进制数据] (跳过 {len(raw)} bytes)")
continue
t = data.get("type")
if t == "mcp":
payload = data.get("payload", {})
method = payload.get("method")
req_id = payload.get("id")
params = payload.get("params", {})
if method == "initialize":
print(f"[MCP] initialize 请求")
resp = {
"type": "mcp",
"payload": {
"jsonrpc": "2.0", "id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "xiaozhi-cli", "version": "1.0"}
}
}
}
await ws.send(json.dumps(resp))
print("[MCP] 已回复 initialize")
elif method == "tools/call":
tool_name = params.get("name")
args = params.get("arguments", {})
print(f"[MCP] 调用工具: {tool_name}")
if tool_name == "psycho_screen":
# 调用我们的筛查器
result = await call_psycho_screener(
messages=args.get("messages", []),
include_prefix=args.get("include_prefix", True),
)
resp = build_mcp_response(req_id, result)
await ws.send(json.dumps(resp))
print(f"[MCP] 筛查结果: detected={result.get('detected')}, category={result.get('category')}")
if result.get("prefix"):
print(f"[MCP] 前缀: {result['prefix'][:80]}")
else:
print(f"[MCP] 未知工具: {tool_name}")
elif t == "text":
content = data.get("content", "")
replies.append(content)
print(f"[玩偶] {content[:200]}")
elif t == "llm":
print(f"[LLM] {str(data)[:150]}")
elif t == "tts":
print("[TTS] (音频跳过)")
elif t == "error":
print(f"[ERROR] {data}")
else:
print(f"[{t}] {str(data)[:100]}")
# 收到第二条文本后退出
if t == "text" and len(replies) >= 2:
break
except asyncio.TimeoutError:
print(f"[超时 {i+1}/30]")
if i > 3:
break
print(f"\n{'='*60}")
print(f"完成!收到 {len(replies)} 条玩偶回复")
for j, r in enumerate(replies):
print(f" 回复{j+1}: {r[:300]}")
print(f"{'='*60}\n")
return True
async def main():
parser = argparse.ArgumentParser(description="小智 AI CLI 客户端(含 MCP")
parser.add_argument("text", nargs="?", help="对话内容")
parser.add_argument("--interactive", "-i", action="store_true", help="交互模式")
args = parser.parse_args()
if args.interactive or not args.text:
print("交互模式暂不支持 MCP 工具处理")
else:
await run_single(args.text)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n已退出")
except Exception as e:
print(f"[错误] {e}")
sys.exit(1)