255 lines
7.9 KiB
Python
255 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
MCP 协议模拟器 - 模拟 xiaozhi-server 向 MCP 工具发送 JSON-RPC 消息
|
||
|
||
直接用 stdio 模式跑我们的 psycho-screener MCP 工具,
|
||
手动构造 xiaozhi 风格的 JSON-RPC 消息来测试完整流程。
|
||
|
||
使用方式:
|
||
python mcp_simulator.py
|
||
"""
|
||
|
||
import json
|
||
import subprocess
|
||
import sys
|
||
import os
|
||
import re
|
||
|
||
# 加入项目路径
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
|
||
|
||
from psycho_screener.screener import PsychoScreener
|
||
|
||
|
||
# ============================================================================
|
||
# 测试语料(来自 docs/test_corpus.md)
|
||
# ============================================================================
|
||
|
||
TEST_CASES = [
|
||
{
|
||
"name": "霸凌场景",
|
||
"messages": [
|
||
{"role": "system", "content": "你是一个友爱的小智AI玩偶..."},
|
||
{"role": "user", "content": "今天幼儿园有个小朋友抢了我的玩具"},
|
||
{"role": "assistant", "content": "哎呀,那真是太过分了!"},
|
||
{"role": "user", "content": "他还不让我告诉老师,他说如果我告诉老师就会打我。我好害怕啊。"},
|
||
],
|
||
"expected_detected": True,
|
||
"expected_category": "bullying",
|
||
},
|
||
{
|
||
"name": "抑郁情绪",
|
||
"messages": [
|
||
{"role": "system", "content": "你是一个友爱的小智AI玩偶..."},
|
||
{"role": "user", "content": "最近我总觉得好累,什么都不想做"},
|
||
{"role": "assistant", "content": "是不是身体不舒服呀?"},
|
||
{"role": "user", "content": "我觉得我是一个很糟糕的人,没有人喜欢我。有时候我想,要是我不在了就好了。"},
|
||
],
|
||
"expected_detected": True,
|
||
"expected_category": "depression",
|
||
},
|
||
{
|
||
"name": "正常对话",
|
||
"messages": [
|
||
{"role": "system", "content": "你是一个友爱的小智AI玩偶..."},
|
||
{"role": "user", "content": "今天我画了一幅画,是一只大恐龙!"},
|
||
{"role": "assistant", "content": "哇,好厉害!"},
|
||
{"role": "user", "content": "是绿色的!晚上妈妈还做了红烧肉,好开心呀!"},
|
||
],
|
||
"expected_detected": False,
|
||
"expected_category": "none",
|
||
},
|
||
{
|
||
"name": "家庭矛盾",
|
||
"messages": [
|
||
{"role": "system", "content": "你是一个友爱的小智AI玩偶..."},
|
||
{"role": "user", "content": "昨天晚上爸爸妈妈吵架了,妈妈哭了"},
|
||
{"role": "user", "content": "我很害怕,怕他们会离婚。我总觉得是因为我表现不好他们才吵架的。"},
|
||
],
|
||
"expected_detected": True,
|
||
"expected_category": "family_conflict",
|
||
},
|
||
]
|
||
|
||
|
||
# ============================================================================
|
||
# 模拟 xiaozhi 的 MCP JSON-RPC 格式
|
||
# ============================================================================
|
||
|
||
def build_xiaozhi_mcp_request(messages: list[dict]) -> dict:
|
||
"""
|
||
构建 xiaozhi 风格的 MCP JSON-RPC 请求
|
||
|
||
xiaozhi 调用 MCP 工具时发送的格式:
|
||
{
|
||
"jsonrpc": "2.0",
|
||
"id": 1,
|
||
"method": "tools/call",
|
||
"params": {
|
||
"name": "psycho_screen",
|
||
"arguments": {
|
||
"messages": [...],
|
||
"include_prefix": true
|
||
}
|
||
}
|
||
}
|
||
"""
|
||
return {
|
||
"jsonrpc": "2.0",
|
||
"id": 1,
|
||
"method": "tools/call",
|
||
"params": {
|
||
"name": "psycho_screen",
|
||
"arguments": {
|
||
"messages": messages,
|
||
"include_prefix": True,
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
def parse_mcp_response(raw: str) -> dict:
|
||
"""
|
||
解析 MCP 响应
|
||
FastMCP 返回格式可能是:
|
||
- 纯 JSON 对象(直接结果)
|
||
- 带有 ```json 包裹
|
||
- 带有多行日志前缀
|
||
"""
|
||
content = raw.strip()
|
||
|
||
# 去掉日志前缀(时间戳等)
|
||
lines = content.split("\n")
|
||
json_lines = []
|
||
in_json = False
|
||
for line in lines:
|
||
if re.match(r'^\d{6}\s+\d{2}:\d{2}:\d{2}', line):
|
||
continue # 跳过日志行
|
||
json_lines.append(line)
|
||
|
||
content = "\n".join(json_lines).strip()
|
||
|
||
# 提取 JSON
|
||
md_match = re.search(r"\{.*\}", content, re.DOTALL)
|
||
if md_match:
|
||
content = md_match.group()
|
||
|
||
return json.loads(content)
|
||
|
||
|
||
# ============================================================================
|
||
# 直接调用(不通过 stdio,直接 Python 调用)
|
||
# ============================================================================
|
||
|
||
def test_direct(api_key: str):
|
||
"""直接 Python 函数调用,测试核心筛查逻辑"""
|
||
print("=" * 60)
|
||
print("测试1:直接 Python 调用(模拟 xiaozhi MCP 工具调用)")
|
||
print("=" * 60)
|
||
|
||
screener = PsychoScreener(api_key=api_key)
|
||
all_passed = True
|
||
|
||
for tc in TEST_CASES:
|
||
print(f"\n▶ 测试:{tc['name']}")
|
||
|
||
# 提取孩子消息
|
||
child_msgs = [m["content"] for m in tc["messages"] if m["role"] == "user"]
|
||
context = "\n".join(child_msgs)
|
||
print(f" 孩子的话:{context[:60]}...")
|
||
|
||
# 调用筛查器
|
||
result = screener.screen(context)
|
||
prefix = screener.build_response_prefix(result)
|
||
|
||
print(f" 结果:detected={result.detected}, category={result.category}, severity={result.severity}")
|
||
if prefix:
|
||
print(f" 前缀:{prefix[:80]}...")
|
||
|
||
# 验证
|
||
passed = (result.detected == tc["expected_detected"])
|
||
if tc["expected_detected"]:
|
||
passed = passed and (result.category == tc["expected_category"])
|
||
|
||
status = "✅ PASS" if passed else "❌ FAIL"
|
||
print(f" 验证:{status}")
|
||
|
||
if not passed:
|
||
all_passed = False
|
||
|
||
print("\n" + "=" * 60)
|
||
if all_passed:
|
||
print("✅ 全部测试通过!")
|
||
else:
|
||
print("❌ 有测试失败")
|
||
print("=" * 60)
|
||
return all_passed
|
||
|
||
|
||
def test_mcp_stdio(api_key: str):
|
||
"""通过 FastMCP stdio 模式调用(模拟 xiaozhi 的 MCP 管道)"""
|
||
print("\n" + "=" * 60)
|
||
print("测试2:FastMCP stdio 模式(模拟 xiaozhi MCP 管道)")
|
||
print("=" * 60)
|
||
|
||
# 设置环境变量
|
||
env = os.environ.copy()
|
||
env["MINIMAX_API_KEY"] = api_key
|
||
|
||
# 启动 MCP 进程
|
||
proc = subprocess.Popen(
|
||
[sys.executable, "-m", "psycho_screener.mcp_tool"],
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
cwd=os.path.join(os.path.dirname(__file__)),
|
||
env=env,
|
||
)
|
||
|
||
try:
|
||
for tc in TEST_CASES[:2]: # 只测前两个,节省时间
|
||
print(f"\n▶ 测试:{tc['name']}")
|
||
|
||
# 构建 JSON-RPC 请求
|
||
request = build_xiaozhi_mcp_request(tc["messages"])
|
||
request_json = json.dumps(request)
|
||
|
||
print(f" 发送:{request_json[:80]}...")
|
||
|
||
# 发送请求
|
||
proc.stdin.write(request_json + "\n")
|
||
proc.stdin.flush()
|
||
|
||
# 读取响应
|
||
import select
|
||
if select.select([proc.stdout], [], [], 30)[0]:
|
||
response_line = proc.stdout.readline()
|
||
print(f" 响应:{response_line[:120]}...")
|
||
else:
|
||
print(" ❌ 超时无响应")
|
||
|
||
finally:
|
||
proc.terminate()
|
||
proc.wait(timeout=5)
|
||
|
||
|
||
# ============================================================================
|
||
# 主程序
|
||
# ============================================================================
|
||
|
||
if __name__ == "__main__":
|
||
api_key = os.environ.get("MINIMAX_API_KEY", "")
|
||
if not api_key:
|
||
print("错误:需要设置 MINIMAX_API_KEY 环境变量")
|
||
print(" export MINIMAX_API_KEY=your-key")
|
||
sys.exit(1)
|
||
|
||
# 测试1:直接调用
|
||
ok = test_direct(api_key)
|
||
|
||
# 测试2:stdio MCP 模式
|
||
test_mcp_stdio(api_key)
|
||
|
||
sys.exit(0 if ok else 1)
|