此示例集合正在增长。将定期添加更多用例。
基本示例
模型选择
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
"将段落总结为三个重点关注收入趋势的要点。输出:Markdown 列表。",
model_tier="small",
# model_override="gpt-5-nano-2025-08-07",
# provider_override="openai",
mode="simple",
)
print(client.wait(handle.task_id).result)
简单问答
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 提出一个简单问题
handle = client.submit_task(query="法国的首都是什么?")
final = client.wait(handle.task_id)
print(f"答案:{final.result}")
数据分析
from shannon import ShannonClient
client = ShannonClient()
# 贴近真实的数据分析
handle = client.submit_task(
query=(
"给定以下销售数据,请计算:\n"
"1. 各产品的总营收\n"
"2. 环比增长百分比\n"
"3. 返回总营收 Top 3\n\n"
"数据:\n"
"ProductA: Jan=$10000, Feb=$12000\n"
"ProductB: Jan=$8000, Feb=$9500\n"
"ProductC: Jan=$15000, Feb=$14000\n"
"ProductD: Jan=$5000, Feb=$6000\n\n"
"格式:JSON 数组 [{product, revenue, mom_growth_pct}]"
)
)
result = client.wait(handle.task_id)
print(result.result)
高级示例
多步工作流(带 session_id)
"""多步工作流示例(Shannon SDK)"""
from shannon import ShannonClient
def main():
client = ShannonClient()
session_id = "quarterly-analysis-demo"
print("=" * 60)
print("多步工作流示例")
print("=" * 60)
# 第 1 步:初始数据查询
print("\n[步骤 1] 加载 Q4 数据...")
h1 = client.submit_task(
query="1000 + 500 等于多少?其中 1000 表示 Q4 营收,500 表示支出。",
session_id=session_id
)
result1 = client.wait(h1.task_id)
print(f"结果:{result1.result}")
# 第 2 步:基于上一结果的分析
print("\n[步骤 2] 分析趋势...")
h2 = client.submit_task(
query="基于你刚刚计算的 Q4 数字,利润率(百分比)是多少?",
session_id=session_id
)
result2 = client.wait(h2.task_id)
print(f"结果:{result2.result}")
# 第 3 步:汇总
print("\n[步骤 3] 生成管理摘要...")
h3 = client.submit_task(
query="用两句话总结 Q4 财务分析。",
session_id=session_id
)
result3 = client.wait(h3.task_id)
print(f"结果:{result3.result}")
print("\n" + "=" * 60)
print("✅ 多步工作流完成!")
print(f"会话 ID: {session_id}")
print("=" * 60)
if __name__ == "__main__":
main()
并行处理
"""并行处理示例(Shannon SDK)"""
import asyncio
from shannon import AsyncShannonClient
async def main():
print("=" * 60)
print("并行处理示例")
print("=" * 60)
async with AsyncShannonClient() as client:
# 并行处理的主题
topics = ["AI", "量子计算", "生物技术"]
print(f"\n[步骤 1] 并行提交 {len(topics)} 个任务…")
# 非阻塞并发提交
tasks = [
client.submit_task(
query=f"用一句话概述 {topic} 及其主要应用。"
)
for topic in topics
]
# 等待所有提交完成
handles = await asyncio.gather(*tasks)
print(f"✅ 已提交 {len(handles)} 个任务")
print("\n[步骤 2] 并行等待所有结果…")
# 并行等待所有结果
results = await asyncio.gather(
*[client.wait(h.task_id) for h in handles]
)
print(f"✅ 已完成 {len(results)} 个任务")
print("\n[步骤 3] 结果:")
print("-" * 60)
# 展示结果
for topic, result in zip(topics, results):
print(f"\n📌 {topic}:")
print(f" {result.result}")
# 如有可用,安全访问元数据
metadata = []
if hasattr(result, 'metadata') and result.metadata:
model = result.metadata.get('model_used') or result.metadata.get('model')
if model:
metadata.append(f"Model: {model}")
tokens = result.metadata.get('total_tokens')
if tokens:
metadata.append(f"Tokens: {tokens}")
if metadata:
print(f" ({', '.join(metadata)})")
print("\n" + "=" * 60)
print("✅ 并行处理完成!")
print(f"共并行处理 {len(topics)} 个主题")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
带过滤的流式
"""带过滤器的流式传输示例"""
from shannon import ShannonClient, EventType
def main():
print("=" * 60)
print("带过滤器的流式传输示例")
print("=" * 60)
client = ShannonClient()
print("\n[步骤 1] 提交任务…")
handle = client.submit_task(
query=(
"用 3 句话比较 AWS、Azure 与 GCP 的定价模式。"
"在生成时以流式方式输出。"
)
)
print(f"✅ 已提交任务:{handle.task_id}")
print("\n[步骤 2] 使用过滤器流式传输 [LLM_OUTPUT, WORKFLOW_COMPLETED]…")
print("-" * 60)
event_count = 0
llm_outputs = []
# 仅流式 LLM_OUTPUT 和 WORKFLOW_COMPLETED
for event in client.stream(
handle.workflow_id,
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
):
event_count += 1
print(f"[{event_count}] {event.type}")
if event.type == EventType.LLM_OUTPUT:
print(f" 内容: {event.message[:80]}…")
llm_outputs.append(event.message)
if event.type == EventType.WORKFLOW_COMPLETED:
print(f" 状态: {event.message}")
break
print("\n" + "-" * 60)
print(f"✅ 收到 {event_count} 条过滤后的事件")
print(f"✅ 捕获 {len(llm_outputs)} 条 LLM 输出")
print("\n[步骤 3] 完整结果:")
print("-" * 60)
final = client.wait(handle.task_id)
print(final.result)
print("\n" + "=" * 60)
print("✅ 带过滤器的流式传输完成!")
print("=" * 60)
if __name__ == "__main__":
main()
可用事件类型
LLM_OUTPUT— 最终 LLM 响应LLM_PARTIAL— 实时流式 tokenWORKFLOW_COMPLETED— 任务完成AGENT_THINKING— 代理推理/思考AGENT_STARTED— 代理开始工作AGENT_COMPLETED— 代理完成工作PROGRESS— 进度更新ERROR_OCCURRED— 错误
- 仅最终输出:
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED] - 实时流式输出:
types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED] - 监控代理生命周期:
types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED] - 全部(不加过滤):
types=None(或省略该参数)
任务控制示例
暂停和恢复长时间运行的任务
"""任务控制(暂停/恢复)示例"""
from shannon import ShannonClient
import time
def main():
print("=" * 60)
print("任务控制(暂停/恢复)示例")
print("=" * 60)
client = ShannonClient(base_url="http://localhost:8080")
# 提交长时间运行的研究任务
print("\n[步骤 1] 提交长时间运行的任务...")
handle = client.submit_task(
query="研究量子计算和 AI 集成的最新发展。提供详细分析。"
)
print(f"✅ 任务已提交: {handle.task_id}")
# 让任务运行一段时间
print("\n[步骤 2] 让任务运行 5 秒...")
time.sleep(5)
# 暂停任务
print("\n[步骤 3] 暂停任务...")
success = client.pause_task(handle.task_id, reason="查看中间结果")
if success:
print("✅ 暂停信号已发送")
# 检查控制状态
print("\n[步骤 4] 检查控制状态...")
state = client.get_control_state(handle.task_id)
print(f"是否暂停: {state.is_paused}")
print(f"是否取消: {state.is_cancelled}")
if state.paused_at:
print(f"暂停时间: {state.paused_at}")
print(f"暂停原因: {state.pause_reason}")
print(f"暂停者: {state.paused_by}")
# 恢复前等待一段时间
print("\n[步骤 5] 恢复前等待 3 秒...")
time.sleep(3)
# 恢复任务
print("\n[步骤 6] 恢复任务...")
success = client.resume_task(handle.task_id, reason="审查后继续")
if success:
print("✅ 恢复信号已发送")
# 等待完成
print("\n[步骤 7] 等待任务完成...")
result = client.wait(handle.task_id)
print(f"\n✅ 任务已完成!")
print(f"状态: {result.status}")
print(f"\n结果预览: {result.result[:200]}...")
print("\n" + "=" * 60)
print("✅ 任务控制示例完成!")
print("=" * 60)
if __name__ == "__main__":
main()
控制信号的错误处理
"""暂停/恢复操作的错误处理"""
from shannon import ShannonClient
from shannon.errors import ShannonError
def main():
client = ShannonClient()
task_id = "task-123"
# 尝试暂停
try:
client.pause_task(task_id, reason="需要审查")
print("✅ 任务暂停成功")
except ShannonError as e:
# 处理特定错误情况
if "cannot pause completed task" in str(e):
print("⚠️ 任务已完成,无法暂停")
elif "task is already paused" in str(e):
print("⚠️ 任务已经处于暂停状态")
elif "Task not found" in str(e):
print("❌ 未找到任务")
else:
print(f"❌ 错误: {e}")
# 尝试恢复
try:
client.resume_task(task_id, reason="继续执行")
print("✅ 任务恢复成功")
except ShannonError as e:
if "task is not paused" in str(e):
print("⚠️ 任务未暂停,无法恢复")
elif "cannot resume completed task" in str(e):
print("⚠️ 任务已完成,无法恢复")
else:
print(f"❌ 错误: {e}")
if __name__ == "__main__":
main()
集群模式
强制多智能体执行
"""集群模式示例(Shannon SDK)"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 使用集群模式提交,协调多个智能体
handle = client.submit_task(
query="调研并对比 AWS、Azure 和 GCP 在 ML 工作负载上的表现。涵盖定价、GPU 可用性和托管服务。",
force_swarm=True,
)
result = client.wait(handle.task_id)
print(result.result)
# 查看模型和用量信息
if result.usage:
print(f"总 Token 数: {result.usage.get('total_tokens')}")
print(f"费用: ${result.usage.get('cost_usd', 0):.6f}")
人机协同审查示例
交互式审查工作流
"""人机协同审查示例"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 假设一个需要审查的工作流已在运行
workflow_id = "workflow-123"
# 检查当前审查状态
state = client.get_review_state(workflow_id)
print(f"审查状态: {state.status}")
print(f"轮次: {state.round}, 版本: {state.version}")
if state.current_plan:
print(f"当前计划:\n{state.current_plan}")
# 显示对话历史
for r in state.rounds:
print(f" [{r.role}] {r.message}")
# 如果仍在审查中,提供反馈
if state.status == "reviewing":
updated = client.submit_review_feedback(
workflow_id,
"请添加输入验证和单元测试。",
version=state.version,
)
print(f"反馈已提交。当前轮次: {updated.round}")
# 批准更新后的计划
result = client.approve_review(workflow_id, version=updated.version)
print(f"已批准: {result['status']}")
带乐观并发控制的审查
"""审查过程中处理版本冲突"""
from shannon import ShannonClient, ValidationError
client = ShannonClient()
workflow_id = "workflow-456"
try:
state = client.get_review_state(workflow_id)
updated = client.submit_review_feedback(
workflow_id,
"整体不错,需要一些细微的样式调整。",
version=state.version,
)
client.approve_review(workflow_id, version=updated.version)
except ValidationError as e:
if "409" in str(e.code):
print("版本冲突:另一用户已修改审查。请刷新后重试。")
else:
raise
技能示例
浏览和查看技能
"""技能浏览示例"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 列出所有可用技能
skills = client.list_skills()
print(f"找到 {len(skills)} 个技能:\n")
for s in skills:
flag = " [危险]" if s.dangerous else ""
status = "已启用" if s.enabled else "已禁用"
print(f" {s.name} v{s.version} ({s.category}) - {status}{flag}")
print(f" {s.description}")
# 按类别过滤
print("\n--- 编程类技能 ---")
coding = client.list_skills(category="coding")
for s in coding:
print(f" {s.name}: {s.description}")
# 获取详细信息
detail = client.get_skill("web_search")
print(f"\n技能详情: {detail.name} v{detail.version}")
print(f" 作者: {detail.author}")
print(f" 所需工具: {detail.requires_tools}")
print(f" 所需角色: {detail.requires_role}")
print(f" 最大预算: {detail.budget_max}")
# 获取版本历史
versions = client.get_skill_versions("web_search")
print(f"\nweb_search 的版本:")
for v in versions:
print(f" v{v.version} - {v.description}")
定时任务管理示例
创建和管理定时任务
"""定时任务管理示例"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 创建一个工作日每天 9 点 UTC 执行的研究任务
result = client.create_schedule(
name="每日 AI 资讯",
cron_expression="0 9 * * 1-5",
task_query="总结最新的 AI 研究资讯",
task_context={"force_research": "true", "research_strategy": "quick"},
timezone="UTC",
max_budget_per_run_usd=0.50,
)
schedule_id = result["schedule_id"]
print(f"定时任务已创建: {schedule_id}")
# 列出所有定时任务
schedules, total = client.list_schedules()
print(f"\n共 {total} 个定时任务:")
for s in schedules:
print(f" {s.name}: {s.cron_expression} ({s.status})")
print(f" 执行次数: 共 {s.total_runs} 次, 成功 {s.successful_runs} 次, 失败 {s.failed_runs} 次")
# 获取定时任务详情
sched = client.get_schedule(schedule_id)
print(f"\n定时任务: {sched.name}")
print(f" 下次执行: {sched.next_run_at}")
print(f" 上次执行: {sched.last_run_at}")
# 查看执行历史
runs, _ = client.get_schedule_runs(schedule_id)
for run in runs:
print(f" {run.triggered_at}: {run.status} - ${run.total_cost_usd:.4f}")
# 暂停定时任务(例如维护期间)
client.pause_schedule(schedule_id, reason="维护窗口")
# 恢复定时任务
client.resume_schedule(schedule_id)
# 更新定时任务配置
client.update_schedule(
schedule_id,
cron_expression="0 8 * * 1-5", # 改为 8 点
max_budget_per_run_usd=0.75,
)
# 不再需要时删除
# client.delete_schedule(schedule_id)
直接工具执行
列出、查看和执行工具
"""直接工具执行示例"""
import json
from shannon import ShannonClient
def main():
client = ShannonClient(base_url="http://localhost:8080")
try:
# 列出可用工具
tools = client.list_tools()
print(f"找到 {len(tools)} 个工具")
for tool in tools[:10]:
print(f" - {tool.name}: {tool.description}")
if not tools:
return
# 获取特定工具的详细信息
target = tools[0].name
detail = client.get_tool(target)
print(f"\n工具: {detail.name}")
print(f"类别: {detail.category}")
print(f"描述: {detail.description}")
print(f"参数: {json.dumps(detail.parameters, indent=2)}")
# 执行工具
result = client.execute_tool(target, arguments={"expression": "6 * 7"})
print(f"\n成功: {result.success}")
if result.text:
print(f"文本: {result.text}")
if result.output is not None:
print(f"输出: {result.output}")
if result.error:
print(f"错误: {result.error}")
finally:
client.close()
if __name__ == "__main__":
main()
确定性智能体执行
执行智能体并发送集群后续消息
"""确定性智能体执行示例"""
import json
from shannon import ShannonClient
def main():
client = ShannonClient(base_url="http://localhost:8080")
try:
# 列出可用智能体
agents = client.list_agents()
print(f"智能体数量: {len(agents)}")
for agent in agents[:10]:
print(f" - {agent.id}: {agent.name} ({agent.tool})")
if not agents:
return
# 获取智能体详情
agent = client.get_agent(agents[0].id)
print(f"\n智能体: {agent.id}")
print(f"名称: {agent.name}")
print(f"描述: {agent.description}")
print(f"输入 schema: {json.dumps(agent.input_schema, indent=2)}")
# 执行智能体
execution = client.execute_agent(
agent.id,
{"query": "分析近期趋势"},
)
print(f"\n任务 ID: {execution.task_id}")
print(f"工作流 ID: {execution.workflow_id}")
print(f"状态: {execution.status}")
# 等待结果
final = execution.wait(timeout=120)
print(f"\n结果: {final.result}")
# 可选:向集群工作流发送后续消息
# result = client.send_swarm_message(execution.workflow_id, "聚焦成本分析")
# print(f"集群后续消息已接受: {result.success}")
finally:
client.close()
if __name__ == "__main__":
main()
OpenAI 兼容聊天补全
非流式与流式
"""OpenAI 兼容聊天补全示例"""
from shannon import ShannonClient, OpenAIChatMessage
def main():
client = ShannonClient(base_url="http://localhost:8080")
try:
# 列出可用模型
models = client.list_openai_models()
print(f"OpenAI 兼容模型: {len(models)}")
for entry in models[:10]:
print(f" - {entry.id}")
messages = [OpenAIChatMessage(role="user", content="用两句话总结 Shannon 的优势。")]
# 非流式补全
completion = client.create_chat_completion(
messages,
model="shannon-chat",
)
print(f"\n响应: {completion.choices[0].message.content}")
if completion.usage:
print(f"Token 数: {completion.usage.total_tokens}")
# 流式补全
print("\n流式响应:")
for chunk in client.stream_chat_completion(
[OpenAIChatMessage(role="user", content="用 3 句话讲一个故事。")],
model="shannon-chat",
include_usage=True,
):
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
finally:
client.close()
if __name__ == "__main__":
main()
会话文件与记忆
列出和下载工作区文件与记忆文件
"""会话文件与记忆示例"""
from shannon import ShannonClient
def main():
client = ShannonClient(base_url="http://localhost:8080")
session_id = "my-session"
try:
# 列出记忆文件
memory_files = client.list_memory_files()
print(f"记忆文件: {len(memory_files)}")
for entry in memory_files[:10]:
print(f" - {entry.path} ({entry.size_bytes} bytes)")
# 下载记忆文件
if memory_files:
downloaded = client.download_memory_file(memory_files[0].path)
print(f"\n记忆文件内容预览:")
print(downloaded.content[:400])
# 列出会话工作区文件
session_files = client.list_session_files(session_id)
print(f"\n会话 {session_id} 的文件: {len(session_files)}")
for entry in session_files[:10]:
print(f" - {entry.path} ({entry.size_bytes} bytes)")
# 下载会话文件
if session_files:
downloaded = client.download_session_file(session_id, session_files[0].path)
print(f"\n会话文件内容预览:")
print(downloaded.content[:400])
finally:
client.close()
if __name__ == "__main__":
main()
错误处理示例
参见专门页面获取完整的重试逻辑与模式:错误处理。后续步骤
异步用法
Async/await 模式
错误处理
正确处理错误