跳转到主要内容
此示例集合正在增长。将定期添加更多用例。

基本示例

模型选择

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
  "将段落总结为三个重点关注收入趋势的要点。输出:Markdown 列表。",
  model_tier="small",
  # model_override="gpt-5-nano-2025-08-07",
  # provider_override="openai",
  mode="simple",
)
print(client.wait(handle.task_id).result)

简单问答

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 提出一个简单问题
handle = client.submit_task(query="法国的首都是什么?")
final = client.wait(handle.task_id)
print(f"答案:{final.result}")

数据分析

from shannon import ShannonClient

client = ShannonClient()

# 贴近真实的数据分析
handle = client.submit_task(
    query=(
        "给定以下销售数据,请计算:\n"
        "1. 各产品的总营收\n"
        "2. 环比增长百分比\n"
        "3. 返回总营收 Top 3\n\n"
        "数据:\n"
        "ProductA: Jan=$10000, Feb=$12000\n"
        "ProductB: Jan=$8000, Feb=$9500\n"
        "ProductC: Jan=$15000, Feb=$14000\n"
        "ProductD: Jan=$5000, Feb=$6000\n\n"
        "格式:JSON 数组 [{product, revenue, mom_growth_pct}]"
    )
)

result = client.wait(handle.task_id)
print(result.result)

高级示例

多步工作流(带 session_id)

"""多步工作流示例(Shannon SDK)"""
from shannon import ShannonClient

def main():
    client = ShannonClient()
    session_id = "quarterly-analysis-demo"

    print("=" * 60)
    print("多步工作流示例")
    print("=" * 60)

    # 第 1 步:初始数据查询
    print("\n[步骤 1] 加载 Q4 数据...")
    h1 = client.submit_task(
        query="1000 + 500 等于多少?其中 1000 表示 Q4 营收,500 表示支出。",
        session_id=session_id
    )
    result1 = client.wait(h1.task_id)
    print(f"结果:{result1.result}")

    # 第 2 步:基于上一结果的分析
    print("\n[步骤 2] 分析趋势...")
    h2 = client.submit_task(
        query="基于你刚刚计算的 Q4 数字,利润率(百分比)是多少?",
        session_id=session_id
    )
    result2 = client.wait(h2.task_id)
    print(f"结果:{result2.result}")

    # 第 3 步:汇总
    print("\n[步骤 3] 生成管理摘要...")
    h3 = client.submit_task(
        query="用两句话总结 Q4 财务分析。",
        session_id=session_id
    )
    result3 = client.wait(h3.task_id)
    print(f"结果:{result3.result}")

    print("\n" + "=" * 60)
    print("✅ 多步工作流完成!")
    print(f"会话 ID: {session_id}")
    print("=" * 60)

if __name__ == "__main__":
    main()

并行处理

"""并行处理示例(Shannon SDK)"""
import asyncio
from shannon import AsyncShannonClient

async def main():
    print("=" * 60)
    print("并行处理示例")
    print("=" * 60)

    async with AsyncShannonClient() as client:
        # 并行处理的主题
        topics = ["AI", "量子计算", "生物技术"]

        print(f"\n[步骤 1] 并行提交 {len(topics)} 个任务…")

        # 非阻塞并发提交
        tasks = [
            client.submit_task(
                query=f"用一句话概述 {topic} 及其主要应用。"
            )
            for topic in topics
        ]

        # 等待所有提交完成
        handles = await asyncio.gather(*tasks)
        print(f"✅ 已提交 {len(handles)} 个任务")

        print("\n[步骤 2] 并行等待所有结果…")

        # 并行等待所有结果
        results = await asyncio.gather(
            *[client.wait(h.task_id) for h in handles]
        )
        print(f"✅ 已完成 {len(results)} 个任务")

        print("\n[步骤 3] 结果:")
        print("-" * 60)

        # 展示结果
        for topic, result in zip(topics, results):
            print(f"\n📌 {topic}:")
            print(f"   {result.result}")

            # 如有可用,安全访问元数据
            metadata = []
            if hasattr(result, 'metadata') and result.metadata:
                model = result.metadata.get('model_used') or result.metadata.get('model')
                if model:
                    metadata.append(f"Model: {model}")
                tokens = result.metadata.get('total_tokens')
                if tokens:
                    metadata.append(f"Tokens: {tokens}")

            if metadata:
                print(f"   ({', '.join(metadata)})")

        print("\n" + "=" * 60)
        print("✅ 并行处理完成!")
        print(f"共并行处理 {len(topics)} 个主题")
        print("=" * 60)

if __name__ == "__main__":
    asyncio.run(main())

带过滤的流式

"""带过滤器的流式传输示例"""
from shannon import ShannonClient, EventType

def main():
    print("=" * 60)
    print("带过滤器的流式传输示例")
    print("=" * 60)

    client = ShannonClient()

    print("\n[步骤 1] 提交任务…")
    handle = client.submit_task(
        query=(
            "用 3 句话比较 AWS、Azure 与 GCP 的定价模式。"
            "在生成时以流式方式输出。"
        )
    )
    print(f"✅ 已提交任务:{handle.task_id}")

    print("\n[步骤 2] 使用过滤器流式传输 [LLM_OUTPUT, WORKFLOW_COMPLETED]…")
    print("-" * 60)

    event_count = 0
    llm_outputs = []

    # 仅流式 LLM_OUTPUT 和 WORKFLOW_COMPLETED
    for event in client.stream(
        handle.workflow_id,
        types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
    ):
        event_count += 1
        print(f"[{event_count}] {event.type}")

        if event.type == EventType.LLM_OUTPUT:
            print(f"    内容: {event.message[:80]}…")
            llm_outputs.append(event.message)

        if event.type == EventType.WORKFLOW_COMPLETED:
            print(f"    状态: {event.message}")
            break

    print("\n" + "-" * 60)
    print(f"✅ 收到 {event_count} 条过滤后的事件")
    print(f"✅ 捕获 {len(llm_outputs)} 条 LLM 输出")

    print("\n[步骤 3] 完整结果:")
    print("-" * 60)
    final = client.wait(handle.task_id)
    print(final.result)

    print("\n" + "=" * 60)
    print("✅ 带过滤器的流式传输完成!")
    print("=" * 60)

if __name__ == "__main__":
    main()
完整事件类型列表参见事件类型目录
可用事件类型
  • LLM_OUTPUT — 最终 LLM 响应
  • LLM_PARTIAL — 实时流式 token
  • WORKFLOW_COMPLETED — 任务完成
  • AGENT_THINKING — 代理推理/思考
  • AGENT_STARTED — 代理开始工作
  • AGENT_COMPLETED — 代理完成工作
  • PROGRESS — 进度更新
  • ERROR_OCCURRED — 错误
常见过滤组合
  • 仅最终输出:types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
  • 实时流式输出:types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED]
  • 监控代理生命周期:types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED]
  • 全部(不加过滤):types=None(或省略该参数)

任务控制示例

暂停和恢复长时间运行的任务

"""任务控制(暂停/恢复)示例"""
from shannon import ShannonClient
import time

def main():
    print("=" * 60)
    print("任务控制(暂停/恢复)示例")
    print("=" * 60)

    client = ShannonClient(base_url="http://localhost:8080")

    # 提交长时间运行的研究任务
    print("\n[步骤 1] 提交长时间运行的任务...")
    handle = client.submit_task(
        query="研究量子计算和 AI 集成的最新发展。提供详细分析。"
    )
    print(f"✅ 任务已提交: {handle.task_id}")

    # 让任务运行一段时间
    print("\n[步骤 2] 让任务运行 5 秒...")
    time.sleep(5)

    # 暂停任务
    print("\n[步骤 3] 暂停任务...")
    success = client.pause_task(handle.task_id, reason="查看中间结果")
    if success:
        print("✅ 暂停信号已发送")

    # 检查控制状态
    print("\n[步骤 4] 检查控制状态...")
    state = client.get_control_state(handle.task_id)
    print(f"是否暂停: {state.is_paused}")
    print(f"是否取消: {state.is_cancelled}")
    if state.paused_at:
        print(f"暂停时间: {state.paused_at}")
        print(f"暂停原因: {state.pause_reason}")
        print(f"暂停者: {state.paused_by}")

    # 恢复前等待一段时间
    print("\n[步骤 5] 恢复前等待 3 秒...")
    time.sleep(3)

    # 恢复任务
    print("\n[步骤 6] 恢复任务...")
    success = client.resume_task(handle.task_id, reason="审查后继续")
    if success:
        print("✅ 恢复信号已发送")

    # 等待完成
    print("\n[步骤 7] 等待任务完成...")
    result = client.wait(handle.task_id)
    print(f"\n✅ 任务已完成!")
    print(f"状态: {result.status}")
    print(f"\n结果预览: {result.result[:200]}...")

    print("\n" + "=" * 60)
    print("✅ 任务控制示例完成!")
    print("=" * 60)

if __name__ == "__main__":
    main()

控制信号的错误处理

"""暂停/恢复操作的错误处理"""
from shannon import ShannonClient
from shannon.errors import APIError

def main():
    client = ShannonClient()

    task_id = "task-123"

    # 尝试暂停
    try:
        client.pause_task(task_id, reason="需要审查")
        print("✅ 任务暂停成功")
    except APIError as e:
        # 处理特定错误情况
        if "cannot pause completed task" in str(e):
            print("⚠️  任务已完成,无法暂停")
        elif "task is already paused" in str(e):
            print("⚠️  任务已经处于暂停状态")
        elif "Task not found" in str(e):
            print("❌ 未找到任务")
        else:
            print(f"❌ 错误: {e}")

    # 尝试恢复
    try:
        client.resume_task(task_id, reason="继续执行")
        print("✅ 任务恢复成功")
    except APIError as e:
        if "task is not paused" in str(e):
            print("⚠️  任务未暂停,无法恢复")
        elif "cannot resume completed task" in str(e):
            print("⚠️  任务已完成,无法恢复")
        else:
            print(f"❌ 错误: {e}")

if __name__ == "__main__":
    main()

真实用例

错误处理示例

参见专门页面获取完整的重试逻辑与模式:错误处理 运行方法:
# 确保 Shannon 已运行
make dev

# 运行示例
python3 error_handling.py
何时使用重试:
  • 网络不稳定
  • 临时性服务问题
  • 对可靠性有要求的生产系统
不建议重试:
  • 超时(任务本身过于复杂/耗时)
  • API 错误(参数无效等)
  • 认证失败

后续步骤