此示例集合正在增长。将定期添加更多用例。
基本示例
模型选择
复制
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
"将段落总结为三个重点关注收入趋势的要点。输出:Markdown 列表。",
model_tier="small",
# model_override="gpt-5-nano-2025-08-07",
# provider_override="openai",
mode="simple",
)
print(client.wait(handle.task_id).result)
简单问答
复制
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 提出一个简单问题
handle = client.submit_task(query="法国的首都是什么?")
final = client.wait(handle.task_id)
print(f"答案:{final.result}")
数据分析
复制
from shannon import ShannonClient
client = ShannonClient()
# 贴近真实的数据分析
handle = client.submit_task(
query=(
"给定以下销售数据,请计算:\n"
"1. 各产品的总营收\n"
"2. 环比增长百分比\n"
"3. 返回总营收 Top 3\n\n"
"数据:\n"
"ProductA: Jan=$10000, Feb=$12000\n"
"ProductB: Jan=$8000, Feb=$9500\n"
"ProductC: Jan=$15000, Feb=$14000\n"
"ProductD: Jan=$5000, Feb=$6000\n\n"
"格式:JSON 数组 [{product, revenue, mom_growth_pct}]"
)
)
result = client.wait(handle.task_id)
print(result.result)
高级示例
多步工作流(带 session_id)
复制
"""多步工作流示例(Shannon SDK)"""
from shannon import ShannonClient
def main():
client = ShannonClient()
session_id = "quarterly-analysis-demo"
print("=" * 60)
print("多步工作流示例")
print("=" * 60)
# 第 1 步:初始数据查询
print("\n[步骤 1] 加载 Q4 数据...")
h1 = client.submit_task(
query="1000 + 500 等于多少?其中 1000 表示 Q4 营收,500 表示支出。",
session_id=session_id
)
result1 = client.wait(h1.task_id)
print(f"结果:{result1.result}")
# 第 2 步:基于上一结果的分析
print("\n[步骤 2] 分析趋势...")
h2 = client.submit_task(
query="基于你刚刚计算的 Q4 数字,利润率(百分比)是多少?",
session_id=session_id
)
result2 = client.wait(h2.task_id)
print(f"结果:{result2.result}")
# 第 3 步:汇总
print("\n[步骤 3] 生成管理摘要...")
h3 = client.submit_task(
query="用两句话总结 Q4 财务分析。",
session_id=session_id
)
result3 = client.wait(h3.task_id)
print(f"结果:{result3.result}")
print("\n" + "=" * 60)
print("✅ 多步工作流完成!")
print(f"会话 ID: {session_id}")
print("=" * 60)
if __name__ == "__main__":
main()
并行处理
复制
"""并行处理示例(Shannon SDK)"""
import asyncio
from shannon import AsyncShannonClient
async def main():
print("=" * 60)
print("并行处理示例")
print("=" * 60)
async with AsyncShannonClient() as client:
# 并行处理的主题
topics = ["AI", "量子计算", "生物技术"]
print(f"\n[步骤 1] 并行提交 {len(topics)} 个任务…")
# 非阻塞并发提交
tasks = [
client.submit_task(
query=f"用一句话概述 {topic} 及其主要应用。"
)
for topic in topics
]
# 等待所有提交完成
handles = await asyncio.gather(*tasks)
print(f"✅ 已提交 {len(handles)} 个任务")
print("\n[步骤 2] 并行等待所有结果…")
# 并行等待所有结果
results = await asyncio.gather(
*[client.wait(h.task_id) for h in handles]
)
print(f"✅ 已完成 {len(results)} 个任务")
print("\n[步骤 3] 结果:")
print("-" * 60)
# 展示结果
for topic, result in zip(topics, results):
print(f"\n📌 {topic}:")
print(f" {result.result}")
# 如有可用,安全访问元数据
metadata = []
if hasattr(result, 'metadata') and result.metadata:
model = result.metadata.get('model_used') or result.metadata.get('model')
if model:
metadata.append(f"Model: {model}")
tokens = result.metadata.get('total_tokens')
if tokens:
metadata.append(f"Tokens: {tokens}")
if metadata:
print(f" ({', '.join(metadata)})")
print("\n" + "=" * 60)
print("✅ 并行处理完成!")
print(f"共并行处理 {len(topics)} 个主题")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
带过滤的流式
复制
"""带过滤器的流式传输示例"""
from shannon import ShannonClient, EventType
def main():
print("=" * 60)
print("带过滤器的流式传输示例")
print("=" * 60)
client = ShannonClient()
print("\n[步骤 1] 提交任务…")
handle = client.submit_task(
query=(
"用 3 句话比较 AWS、Azure 与 GCP 的定价模式。"
"在生成时以流式方式输出。"
)
)
print(f"✅ 已提交任务:{handle.task_id}")
print("\n[步骤 2] 使用过滤器流式传输 [LLM_OUTPUT, WORKFLOW_COMPLETED]…")
print("-" * 60)
event_count = 0
llm_outputs = []
# 仅流式 LLM_OUTPUT 和 WORKFLOW_COMPLETED
for event in client.stream(
handle.workflow_id,
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
):
event_count += 1
print(f"[{event_count}] {event.type}")
if event.type == EventType.LLM_OUTPUT:
print(f" 内容: {event.message[:80]}…")
llm_outputs.append(event.message)
if event.type == EventType.WORKFLOW_COMPLETED:
print(f" 状态: {event.message}")
break
print("\n" + "-" * 60)
print(f"✅ 收到 {event_count} 条过滤后的事件")
print(f"✅ 捕获 {len(llm_outputs)} 条 LLM 输出")
print("\n[步骤 3] 完整结果:")
print("-" * 60)
final = client.wait(handle.task_id)
print(final.result)
print("\n" + "=" * 60)
print("✅ 带过滤器的流式传输完成!")
print("=" * 60)
if __name__ == "__main__":
main()
可用事件类型
LLM_OUTPUT— 最终 LLM 响应LLM_PARTIAL— 实时流式 tokenWORKFLOW_COMPLETED— 任务完成AGENT_THINKING— 代理推理/思考AGENT_STARTED— 代理开始工作AGENT_COMPLETED— 代理完成工作PROGRESS— 进度更新ERROR_OCCURRED— 错误
- 仅最终输出:
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED] - 实时流式输出:
types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED] - 监控代理生命周期:
types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED] - 全部(不加过滤):
types=None(或省略该参数)
任务控制示例
暂停和恢复长时间运行的任务
复制
"""任务控制(暂停/恢复)示例"""
from shannon import ShannonClient
import time
def main():
print("=" * 60)
print("任务控制(暂停/恢复)示例")
print("=" * 60)
client = ShannonClient(base_url="http://localhost:8080")
# 提交长时间运行的研究任务
print("\n[步骤 1] 提交长时间运行的任务...")
handle = client.submit_task(
query="研究量子计算和 AI 集成的最新发展。提供详细分析。"
)
print(f"✅ 任务已提交: {handle.task_id}")
# 让任务运行一段时间
print("\n[步骤 2] 让任务运行 5 秒...")
time.sleep(5)
# 暂停任务
print("\n[步骤 3] 暂停任务...")
success = client.pause_task(handle.task_id, reason="查看中间结果")
if success:
print("✅ 暂停信号已发送")
# 检查控制状态
print("\n[步骤 4] 检查控制状态...")
state = client.get_control_state(handle.task_id)
print(f"是否暂停: {state.is_paused}")
print(f"是否取消: {state.is_cancelled}")
if state.paused_at:
print(f"暂停时间: {state.paused_at}")
print(f"暂停原因: {state.pause_reason}")
print(f"暂停者: {state.paused_by}")
# 恢复前等待一段时间
print("\n[步骤 5] 恢复前等待 3 秒...")
time.sleep(3)
# 恢复任务
print("\n[步骤 6] 恢复任务...")
success = client.resume_task(handle.task_id, reason="审查后继续")
if success:
print("✅ 恢复信号已发送")
# 等待完成
print("\n[步骤 7] 等待任务完成...")
result = client.wait(handle.task_id)
print(f"\n✅ 任务已完成!")
print(f"状态: {result.status}")
print(f"\n结果预览: {result.result[:200]}...")
print("\n" + "=" * 60)
print("✅ 任务控制示例完成!")
print("=" * 60)
if __name__ == "__main__":
main()
控制信号的错误处理
复制
"""暂停/恢复操作的错误处理"""
from shannon import ShannonClient
from shannon.errors import APIError
def main():
client = ShannonClient()
task_id = "task-123"
# 尝试暂停
try:
client.pause_task(task_id, reason="需要审查")
print("✅ 任务暂停成功")
except APIError as e:
# 处理特定错误情况
if "cannot pause completed task" in str(e):
print("⚠️ 任务已完成,无法暂停")
elif "task is already paused" in str(e):
print("⚠️ 任务已经处于暂停状态")
elif "Task not found" in str(e):
print("❌ 未找到任务")
else:
print(f"❌ 错误: {e}")
# 尝试恢复
try:
client.resume_task(task_id, reason="继续执行")
print("✅ 任务恢复成功")
except APIError as e:
if "task is not paused" in str(e):
print("⚠️ 任务未暂停,无法恢复")
elif "cannot resume completed task" in str(e):
print("⚠️ 任务已完成,无法恢复")
else:
print(f"❌ 错误: {e}")
if __name__ == "__main__":
main()
真实用例
错误处理示例
参见专门页面获取完整的重试逻辑与模式:错误处理。 运行方法:复制
# 确保 Shannon 已运行
make dev
# 运行示例
python3 error_handling.py
- 网络不稳定
- 临时性服务问题
- 对可靠性有要求的生产系统
- 超时(任务本身过于复杂/耗时)
- API 错误(参数无效等)
- 认证失败