跳转到主要内容

Swarm 多智能体工作流

本教程演示如何使用 Shannon 的 SwarmWorkflow 部署持久化协作智能体,它们并行工作,支持智能体间消息传递和共享工作区。

你将学到

  • 如何通过 API 和 Python SDK 提交 Swarm 任务
  • 智能体如何分解、执行和综合结果
  • 如何通过 SSE 流式传输监控智能体进度
  • 如何配置 Swarm 参数
  • 实际用例与最佳实践

前置条件

  • 已运行的 Shannon 堆栈(Docker Compose)
  • Gateway 可通过 http://localhost:8080 访问
  • config/features.yaml 中已启用 Swarm(默认启用)
  • 鉴权默认:
    • Docker Compose:默认关闭鉴权(GATEWAY_SKIP_AUTH=1)。
    • 本地构建:默认开启鉴权。可设置 GATEWAY_SKIP_AUTH=1 关闭;或在请求中加入 API key 头 -H "X-API-Key: $API_KEY"

快速开始

1
提交 Swarm 任务
2
在 context 中设置 force_swarm: true 将任务路由到 SwarmWorkflow:
3
curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Compare AI chip markets across US, Japan, and South Korea",
    "session_id": "swarm-demo-001",
    "context": {
      "force_swarm": true
    }
  }'
4
响应:
5
{
  "task_id": "task-abc123...",
  "status": "STATUS_CODE_OK",
  "message": "Task submitted successfully",
  "created_at": "2025-11-10T10:00:00Z"
}
6
流式传输进度事件
7
连接 SSE 流以实时观察智能体工作:
8
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task-abc123..."
9
你将看到如下事件:
10
data: {"type":"WORKFLOW_STARTED","agent_id":"swarm-supervisor","message":"Assigning a team of agents"}
data: {"type":"PROGRESS","agent_id":"swarm-supervisor","message":"Planning approach"}
data: {"type":"PROGRESS","agent_id":"swarm-supervisor","message":"Assigning 3 agents"}
data: {"type":"AGENT_STARTED","agent_id":"takao","message":"Agent takao started"}
data: {"type":"AGENT_STARTED","agent_id":"mitaka","message":"Agent mitaka started"}
data: {"type":"AGENT_STARTED","agent_id":"kichijoji","message":"Agent kichijoji started"}
data: {"type":"PROGRESS","agent_id":"takao","message":"Agent takao progress: iteration 1/25, action: tool_call"}
data: {"type":"AGENT_COMPLETED","agent_id":"takao","message":"Agent takao completed"}
data: {"type":"AGENT_COMPLETED","agent_id":"mitaka","message":"Agent mitaka completed"}
data: {"type":"AGENT_COMPLETED","agent_id":"kichijoji","message":"Agent kichijoji completed"}
data: {"type":"PROGRESS","agent_id":"swarm-supervisor","message":"Combining findings from 3 agents"}
data: {"type":"WORKFLOW_COMPLETED","agent_id":"swarm-supervisor","message":"All done"}
11
获取结果
12
curl "http://localhost:8080/api/v1/tasks/task-abc123..."
13
响应:
14
{
  "task_id": "task-abc123...",
  "status": "TASK_STATUS_COMPLETED",
  "result": "## AI Chip Market Comparison\n\n### United States\nThe US market is dominated by NVIDIA...\n\n### Japan\nJapan focuses on edge AI...\n\n### South Korea\nSouth Korea leverages Samsung...",
  "metadata": {
    "workflow_type": "swarm",
    "total_agents": 3,
    "agents": [
      {"agent_id": "takao", "iterations": 8, "tokens": 12400, "success": true, "model": "gpt-5-mini-2025-08-07"},
      {"agent_id": "mitaka", "iterations": 6, "tokens": 9800, "success": true, "model": "gpt-5-mini-2025-08-07"},
      {"agent_id": "kichijoji", "iterations": 7, "tokens": 11200, "success": true, "model": "gpt-5-mini-2025-08-07"}
    ]
  }
}

一步提交 + 流式传输

对于前端应用,使用合并的提交+流式传输端点:
curl -s -X POST http://localhost:8080/api/v1/tasks/stream \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Analyze the competitive landscape of cloud AI platforms: AWS, Azure, and GCP",
    "context": { "force_swarm": true }
  }' | jq
响应:
{
  "workflow_id": "task-def456...",
  "task_id": "task-def456...",
  "stream_url": "/api/v1/stream/sse?workflow_id=task-def456..."
}
然后连接流式传输 URL 获取实时事件:
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task-def456..."

Python SDK

基本用法

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 提交 Swarm 任务
handle = client.submit_task(
    "Compare AI chip markets across US, Japan, and South Korea",
    force_swarm=True,
    session_id="swarm-demo-001",
)

# 等待完成并获取结果
result = client.wait(handle.task_id)
print(result.result)
client.close()

带流式传输

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 一步提交并获取流式传输 URL
handle, stream_url = client.submit_and_stream(
    "Analyze the competitive landscape of cloud AI platforms",
    force_swarm=True,
)

# 实时流式传输事件
for event in client.stream(handle.workflow_id):
    if event.type == "AGENT_STARTED":
        print(f"智能体已启动: {event.agent_id}")
    elif event.type == "PROGRESS":
        print(f"进度: {event.message}")
    elif event.type == "AGENT_COMPLETED":
        print(f"智能体已完成: {event.agent_id}")
    elif event.type == "WORKFLOW_COMPLETED":
        print("Swarm 工作流已完成")
        break

# 获取最终结果
result = client.get_status(handle.task_id)
print(result.result)
client.close()

自定义上下文

handle = client.submit_task(
    "Research renewable energy policies in the EU, US, and China",
    force_swarm=True,
    context={
        "model_tier": "medium",  # 智能体使用中等级别模型
    },
)

智能体协作方式

团队名册

每个智能体都会收到团队名册,显示所有智能体及其任务分配。这使智能体知道就特定信息联系谁:
## Your Team (shared session workspace)
- **takao (you)**: "Research US AI chip market"
- mitaka: "Research Japan AI chip market"
- kichijoji: "Research South Korea AI chip market"

发布发现

智能体通过共享工作区分享发现。这些内容会出现在每个智能体的提示上下文中:
## Shared Findings
- takao: NVIDIA dominates US with 80% market share...
- mitaka: Japan focuses on edge AI chips with Preferred Networks leading...

发送直接消息

智能体可以向特定队友发送直接消息:
## Inbox Messages
- From mitaka (info): {"message": "Check Samsung's foundry plans for AI chips"}

请求帮助

当智能体需要额外支持时,可以向监督者请求帮助:
{"action": "request_help", "help_description": "Need help analyzing EU regulatory impact on AI chips", "help_skills": ["web_search"]}
监督者会生成新智能体处理请求,并通知原始智能体。

配置

features.yaml

workflows:
  swarm:
    enabled: true
    max_agents: 10                    # 最大智能体总数(初始 + 动态)
    max_iterations_per_agent: 25      # 每个智能体的最大推理-行动循环次数
    agent_timeout_seconds: 600        # 每个智能体超时(10 分钟)
    max_messages_per_agent: 20        # 每个智能体的 P2P 消息上限
    workspace_snippet_chars: 800      # 提示中每个工作区条目的最大字符数
    workspace_max_entries: 5          # 向每个智能体显示的最近条目数

配置参数

参数默认值范围描述
enabledtruetrue/false启用或禁用 Swarm 工作流
max_agents101-50智能体总数上限(含动态生成)
max_iterations_per_agent251-100每个智能体的最大推理-行动循环次数
agent_timeout_seconds60060-3600每个智能体的超时时间
max_messages_per_agent201-100每个智能体可发送的 P2P 消息上限
workspace_snippet_chars800100-4000智能体提示中工作区条目的截断限制
workspace_max_entries51-20每个主题显示的最近工作区条目数

实际用例

市场研究

分配智能体并行研究不同市场或竞争对手,然后综合为统一报告。

技术对比

每个智能体评估不同的框架、库或架构,在发现过程中实时分享。

多区域分析

智能体同时研究不同地理区域,发现交叉传播。

文献综述

智能体探索研究课题的不同方面,通过工作区分享来源和关键发现。

示例:技术框架对比

curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Compare LangChain, CrewAI, and AutoGen for building multi-agent systems. Evaluate architecture, community, performance, and production readiness.",
    "context": { "force_swarm": true }
  }'
此任务会被分解为多个智能体,每个研究一个框架,通过工作区分享发现,使每个智能体可以引用其他人的发现。

示例:综合行业分析

curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Analyze the global electric vehicle battery supply chain: raw materials sourcing, manufacturing capacity, technology trends, and geopolitical risks",
    "context": {
      "force_swarm": true,
      "model_tier": "medium"
    }
  }'

理解响应元数据

Swarm 工作流返回包含每个智能体详情的元数据:
{
  "metadata": {
    "workflow_type": "swarm",
    "total_agents": 4,
    "agents": [
      {
        "agent_id": "takao",
        "iterations": 8,
        "tokens": 14200,
        "success": true,
        "model": "gpt-5-mini-2025-08-07"
      },
      {
        "agent_id": "mitaka",
        "iterations": 6,
        "tokens": 9800,
        "success": true,
        "model": "gpt-5-mini-2025-08-07"
      }
    ]
  }
}
字段描述
workflow_typeSwarm 工作流始终为 "swarm"
total_agents参与的智能体总数
agents[].agent_id唯一智能体名称
agents[].iterations完成的推理-行动循环次数
agents[].tokens该智能体消耗的总 Token 数
agents[].success智能体是否成功完成
agents[].model智能体使用的 LLM 模型

提示与最佳实践

  • 设置 context.force_swarm: true 路由到 SwarmWorkflow
  • 从默认配置开始,根据结果进行调整
  • 通过 SSE 事件监控智能体行为
  • 使用会话(session_id)进行多轮 Swarm 对话

故障排除

常见问题
  • Swarm 未触发:确保 force_swarm: truecontext 对象中,且 features.yaml 中已启用 Swarm
  • 智能体超时:对复杂任务增加 agent_timeout_seconds
  • 智能体过多:简化查询以减少子任务数量,或降低 max_agents
  • Token 消耗过高:降低 max_iterations_per_agent 或使用 model_tier: "small"
  • 智能体陷入循环:收敛检测(连续 3 次非工具迭代)应自动捕获此情况

回退行为

如果 Swarm 工作流失败(分解错误、所有智能体失败等),Shannon 会自动回退到标准 DAG/Supervisor 工作流路由。force_swarm 标志会从上下文中移除以防止递归失败。

下一步