跳转到主要内容

Swarm 多智能体工作流

本教程演示如何使用 Shannon 的 SwarmWorkflow 部署由 LLM 驱动的 Lead Agent 协调的持久化协作智能体。智能体并行工作,支持智能体间消息传递、共享工作区和动态任务重新分配。

你将学到

  • 如何通过 API 和 Python SDK 提交 Swarm 任务
  • Lead Agent 如何通过事件协调智能体
  • 如何通过 SSE 流式传输监控智能体进度
  • 如何配置 Swarm 参数和预算控制
  • 实际用例与最佳实践

前置条件

  • 已运行的 Shannon 堆栈(Docker Compose)
  • Gateway 可通过 http://localhost:8080 访问
  • config/features.yaml 中已启用 Swarm(默认启用)
  • 鉴权默认:
    • Docker Compose:默认关闭鉴权(GATEWAY_SKIP_AUTH=1)。
    • 本地构建:默认开启鉴权。可设置 GATEWAY_SKIP_AUTH=1 关闭;或在请求中加入 API key 头 -H "X-API-Key: $API_KEY"

快速开始

1
提交 Swarm 任务
2
在 context 中设置 force_swarm: true 将任务路由到 SwarmWorkflow:
3
curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Compare AI chip markets across US, Japan, and South Korea",
    "session_id": "swarm-demo-001",
    "context": {
      "force_swarm": true
    }
  }'
4
响应:
5
{
  "task_id": "task-abc123...",
  "status": "STATUS_CODE_OK",
  "message": "Task submitted successfully",
  "created_at": "2025-11-10T10:00:00Z"
}
6
流式传输进度事件
7
连接 SSE 流以实时观察智能体工作:
8
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task-abc123..."
9
你将看到如下事件:
10
data: {"type":"WORKFLOW_STARTED","agent_id":"swarm-supervisor","message":"Lead Agent initializing team"}
data: {"type":"LEAD_DECISION","agent_id":"swarm-lead","message":"Creating initial plan with 3 tasks"}
data: {"type":"TASKLIST_UPDATED","agent_id":"swarm-lead","message":"Task graph updated: 4 tasks (3 research + 1 synthesis)"}
data: {"type":"TEAM_STATUS","agent_id":"swarm-lead","message":"Spawned agent takao"}
data: {"type":"AGENT_STARTED","agent_id":"takao","message":"Agent takao started"}
data: {"type":"TEAM_STATUS","agent_id":"swarm-lead","message":"Spawned agent mitaka"}
data: {"type":"AGENT_STARTED","agent_id":"mitaka","message":"Agent mitaka started"}
data: {"type":"TEAM_STATUS","agent_id":"swarm-lead","message":"Spawned agent kichijoji"}
data: {"type":"AGENT_STARTED","agent_id":"kichijoji","message":"Agent kichijoji started"}
data: {"type":"PROGRESS","agent_id":"takao","message":"Agent takao progress: iteration 1/25, action: tool_call"}
data: {"type":"AGENT_COMPLETED","agent_id":"takao","message":"Agent takao completed"}
data: {"type":"LEAD_DECISION","agent_id":"swarm-lead","message":"Assigning synthesis task to takao"}
data: {"type":"AGENT_COMPLETED","agent_id":"mitaka","message":"Agent mitaka completed"}
data: {"type":"AGENT_COMPLETED","agent_id":"kichijoji","message":"Agent kichijoji completed"}
data: {"type":"LEAD_DECISION","agent_id":"swarm-lead","message":"All tasks complete, finalizing"}
data: {"type":"WORKFLOW_COMPLETED","agent_id":"swarm-supervisor","message":"All done"}
11
获取结果
12
curl "http://localhost:8080/api/v1/tasks/task-abc123..."
13
响应:
14
{
  "task_id": "task-abc123...",
  "status": "TASK_STATUS_COMPLETED",
  "result": "## AI Chip Market Comparison\n\n### United States\nThe US market is dominated by NVIDIA...\n\n### Japan\nJapan focuses on edge AI...\n\n### South Korea\nSouth Korea leverages Samsung...",
  "metadata": {
    "workflow_type": "swarm",
    "total_agents": 3,
    "total_tokens": 598235,
    "model_breakdown": [
      {
        "model": "claude-haiku-4-5-20251001",
        "provider": "anthropic",
        "executions": 38,
        "tokens": 297524,
        "cost_usd": 0.372
      },
      {
        "model": "shannon_web_search",
        "provider": "shannon-scraper",
        "executions": 12,
        "tokens": 90000,
        "cost_usd": 0.048
      }
    ]
  },
  "usage": {
    "input_tokens": 289164,
    "output_tokens": 309071,
    "total_tokens": 598235,
    "estimated_cost": 0.780
  }
}

一步提交 + 流式传输

对于前端应用,使用合并的提交+流式传输端点:
curl -s -X POST http://localhost:8080/api/v1/tasks/stream \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Analyze the competitive landscape of cloud AI platforms: AWS, Azure, and GCP",
    "context": { "force_swarm": true }
  }' | jq
响应:
{
  "workflow_id": "task-def456...",
  "task_id": "task-def456...",
  "stream_url": "/api/v1/stream/sse?workflow_id=task-def456..."
}
然后连接流式传输 URL 获取实时事件:
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task-def456..."

Python SDK

基本用法

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 提交 Swarm 任务
handle = client.submit_task(
    "Compare AI chip markets across US, Japan, and South Korea",
    force_swarm=True,
    session_id="swarm-demo-001",
)

# 等待完成并获取结果
result = client.wait(handle.task_id)
print(result.result)
client.close()

带流式传输

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 一步提交并获取流式传输 URL
handle, stream_url = client.submit_and_stream(
    "Analyze the competitive landscape of cloud AI platforms",
    force_swarm=True,
)

# 实时流式传输事件
for event in client.stream(handle.workflow_id):
    if event.type == "AGENT_STARTED":
        print(f"智能体已启动: {event.agent_id}")
    elif event.type == "LEAD_DECISION":
        print(f"Lead 决策: {event.message}")
    elif event.type == "PROGRESS":
        print(f"进度: {event.message}")
    elif event.type == "AGENT_COMPLETED":
        print(f"智能体已完成: {event.agent_id}")
    elif event.type == "WORKFLOW_COMPLETED":
        print("Swarm 工作流已完成")
        break

# 获取最终结果
result = client.get_status(handle.task_id)
print(result.result)
client.close()

自定义上下文

handle = client.submit_task(
    "Research renewable energy policies in the EU, US, and China",
    force_swarm=True,
    context={
        "model_tier": "medium",  # 智能体使用中等级别模型
    },
)

智能体协作方式

Lead Agent 协调

Lead Agent 作为事件驱动的协调者。它不执行任务本身,而是基于事件进行规划、分配和重新分配工作:
  • 当智能体变为 idle 时,Lead 检查是否有依赖已满足的待处理任务并分配下一个
  • 当智能体 完成 时,Lead 评估是否重新分配它、关闭它或修订计划
  • 在定期 检查点(每 120 秒)时,Lead 审查整体进度并可调整计划
  • 当没有空闲智能体且没有可操作的待处理任务时,Lead 跳过不必要的 LLM 调用

团队名册

每个智能体都会收到团队名册,显示所有智能体及其任务分配。这使智能体知道就特定信息联系谁:
## Your Team (shared session workspace)
- **takao (you)**: "Research US AI chip market"
- mitaka: "Research Japan AI chip market"
- kichijoji: "Research South Korea AI chip market"

发布发现

智能体通过共享工作区分享发现。这些内容会出现在每个智能体的提示上下文中:
## Shared Findings
- takao: NVIDIA dominates US with 80% market share...
- mitaka: Japan focuses on edge AI chips with Preferred Networks leading...

发送直接消息

智能体可以向特定队友发送直接消息:
## Inbox Messages
- From mitaka (info): {"message": "Check Samsung's foundry plans for AI chips"}

请求帮助

当智能体需要额外支持时,可以向 Lead Agent 请求帮助:
{"action": "request_help", "help_description": "Need help analyzing EU regulatory impact on AI chips", "help_skills": ["web_search"]}
Lead Agent 评估请求后,可能生成新智能体、重新分配现有空闲智能体,或将子任务添加到待处理任务队列。

配置

features.yaml

workflows:
  swarm:
    enabled: true
    max_agents: 10                    # 最大智能体总数(初始 + 动态)
    max_iterations_per_agent: 25      # 每个智能体的最大推理-行动循环次数
    agent_timeout_seconds: 1800       # 每个智能体超时(30 分钟)
    max_messages_per_agent: 20        # 每个智能体的 P2P 消息上限
    workspace_snippet_chars: 800      # 提示中每个工作区条目的最大字符数
    workspace_max_entries: 5          # 向每个智能体显示的最近条目数
    max_total_llm_calls: 200          # 全局 LLM 调用预算
    max_total_tokens: 1000000         # 全局 Token 预算 (1M)
    max_wall_clock_minutes: 30        # 最大挂钟时间

配置参数

参数默认值范围描述
enabledtruetrue/false启用或禁用 Swarm 工作流
max_agents101-50智能体总数上限(含动态生成)
max_iterations_per_agent251-100每个智能体的最大推理-行动循环次数
agent_timeout_seconds180060-7200每个智能体的超时时间
max_messages_per_agent201-100每个智能体可发送的 P2P 消息上限
workspace_snippet_chars800100-4000智能体提示中工作区条目的截断限制
workspace_max_entries51-20每个主题显示的最近工作区条目数
max_total_llm_calls20010-1000整个 Swarm 中所有智能体的最大 LLM 调用次数
max_total_tokens100000010000-10000000所有智能体消耗的最大 Token 数
max_wall_clock_minutes301-120整个 Swarm 执行的最大挂钟时间

实际用例

协作编码

智能体协作审查、实现和测试代码,支持沙箱化执行。

金融分析

多空分析师、情绪智能体和投资组合经理协作综合投资洞察。

数据处理

并行数据流水线,支持沙箱化 Python 执行、JSON 查询和统计分析。

竞争情报

同时监控竞争对手网站、定价和社交媒体,发现自动交叉共享。

示例:协作代码审查

curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Review the Python files in /workspace/src for security vulnerabilities, code quality issues, and missing test coverage. Write fixes and add tests.",
    "context": { "force_swarm": true }
  }'
Lead Agent 为每个关注点(安全审计、代码质量、测试覆盖)创建任务,分配 developer 角色的智能体,并创建依赖于所有审查完成的最终综合任务。

示例:多站点价格监控

curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Compare pricing and features of AWS, Azure, and GCP for a startup needing GPU instances, object storage, and managed Kubernetes",
    "context": {
      "force_swarm": true,
      "model_tier": "medium"
    }
  }'

理解响应元数据

Swarm 工作流返回包含模型执行明细和 Token 用量的元数据:
{
  "metadata": {
    "workflow_type": "swarm",
    "total_agents": 3,
    "total_tokens": 598235,
    "model_breakdown": [
      {
        "model": "claude-haiku-4-5-20251001",
        "provider": "anthropic",
        "executions": 38,
        "tokens": 297524,
        "cost_usd": 0.372
      },
      {
        "model": "shannon_web_search",
        "provider": "shannon-scraper",
        "executions": 12,
        "tokens": 90000,
        "cost_usd": 0.048
      }
    ]
  },
  "usage": {
    "input_tokens": 289164,
    "output_tokens": 309071,
    "total_tokens": 598235,
    "estimated_cost": 0.780
  }
}
字段描述
metadata.workflow_typeSwarm 工作流始终为 "swarm"
metadata.total_agents参与的智能体总数
metadata.total_tokens整个 Swarm 消耗的 Token 总数
metadata.model_breakdown[]按模型汇总的执行明细
metadata.model_breakdown[].model模型标识符
metadata.model_breakdown[].provider提供商名称
metadata.model_breakdown[].executions使用该模型的 LLM 调用次数
metadata.model_breakdown[].tokens该模型消耗的 Token 数
metadata.model_breakdown[].cost_usd预估成本(美元)
usage.input_tokens所有智能体的输入 Token 总数
usage.output_tokens所有智能体的输出 Token 总数
usage.total_tokensToken 总数(输入 + 输出)
usage.estimated_cost总预估成本(美元)

提示与最佳实践

  • 设置 context.force_swarm: true 路由到 SwarmWorkflow
  • 从默认配置开始,根据结果进行调整
  • 通过 SSE 事件监控 Lead Agent 决策和智能体行为
  • 使用会话(session_id)进行多轮 Swarm 对话
  • 关注 LEAD_DECISION 事件以理解协调逻辑

故障排除

常见问题
  • Swarm 未触发:确保 force_swarm: truecontext 对象中,且 features.yaml 中已启用 Swarm
  • 智能体超时:对复杂任务增加 agent_timeout_seconds(默认 1800 秒 / 30 分钟)
  • 智能体过多:简化查询以减少子任务数量,或降低 max_agents
  • Token 消耗过高:降低 max_iterations_per_agent、使用 model_tier: "small",或降低 max_total_tokens
  • 智能体陷入循环:收敛检测(连续 3 次非工具迭代)应自动捕获此情况
  • 预算超限:检查 max_total_llm_callsmax_total_tokens 设置;Lead 会在预算紧张时尝试优雅关闭
  • 重复搜索:知识去重应自动处理;如持续出现,检查智能体是否可访问共享工作区

回退行为

如果 Swarm 工作流失败(规划错误、所有智能体失败等),Shannon 会自动回退到标准 DAG/Supervisor 工作流路由。force_swarm 标志会从上下文中移除以防止递归失败。

下一步

Swarm 概念

深入了解 Swarm 架构

深度研究

带引用的多阶段研究

API 参考

完整 API 文档