メインコンテンツへスキップ

Swarmマルチエージェントワークフロー

このチュートリアルでは、ShannonのSwarmWorkflowを使用して、LLMを搭載したLead Agentが調整する永続的で協調するAgentをデプロイする方法を紹介します。Agentは並列で作業し、Agent間メッセージング、共有ワークスペース、動的タスク再割り当てをサポートします。

学べること

  • APIとPython SDKでSwarmタスクを送信する方法
  • Lead Agentがイベントを通じてAgentを調整する仕組み
  • SSEストリーミングでAgentの進捗を監視する方法
  • Swarmパラメータと予算制御の設定方法
  • 実際のユースケースとベストプラクティス

前提条件

  • Shannonスタックが稼働中(Docker Compose)
  • http://localhost:8080でゲートウェイにアクセス可能
  • config/features.yamlでSwarmが有効(デフォルトで有効)
  • 認証のデフォルト:
    • Docker Compose: 認証はデフォルトで無効(GATEWAY_SKIP_AUTH=1)。
    • ローカルビルド: 認証はデフォルトで有効。認証を無効にするにはGATEWAY_SKIP_AUTH=1を設定するか、APIキーヘッダー-H "X-API-Key: $API_KEY"を含めます。

クイックスタート

1
Swarmタスクの送信
2
コンテキストにforce_swarm: trueを設定してSwarmWorkflowにルーティングします:
3
curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Compare AI chip markets across US, Japan, and South Korea",
    "session_id": "swarm-demo-001",
    "context": {
      "force_swarm": true
    }
  }'
4
レスポンス:
5
{
  "task_id": "task-abc123...",
  "status": "STATUS_CODE_OK",
  "message": "Task submitted successfully",
  "created_at": "2025-11-10T10:00:00Z"
}
6
進捗イベントのストリーミング
7
SSEストリームに接続してAgentの作業をリアルタイムで監視します:
8
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task-abc123..."
9
以下のようなイベントが表示されます:
10
data: {"type":"WORKFLOW_STARTED","agent_id":"swarm-supervisor","message":"Lead Agent initializing team"}
data: {"type":"LEAD_DECISION","agent_id":"swarm-lead","message":"Creating initial plan with 3 tasks"}
data: {"type":"TASKLIST_UPDATED","agent_id":"swarm-lead","message":"Task graph updated: 4 tasks (3 research + 1 synthesis)"}
data: {"type":"TEAM_STATUS","agent_id":"swarm-lead","message":"Spawned agent takao"}
data: {"type":"AGENT_STARTED","agent_id":"takao","message":"Agent takao started"}
data: {"type":"TEAM_STATUS","agent_id":"swarm-lead","message":"Spawned agent mitaka"}
data: {"type":"AGENT_STARTED","agent_id":"mitaka","message":"Agent mitaka started"}
data: {"type":"TEAM_STATUS","agent_id":"swarm-lead","message":"Spawned agent kichijoji"}
data: {"type":"AGENT_STARTED","agent_id":"kichijoji","message":"Agent kichijoji started"}
data: {"type":"PROGRESS","agent_id":"takao","message":"Agent takao progress: iteration 1/25, action: tool_call"}
data: {"type":"AGENT_COMPLETED","agent_id":"takao","message":"Agent takao completed"}
data: {"type":"LEAD_DECISION","agent_id":"swarm-lead","message":"Assigning synthesis task to takao"}
data: {"type":"AGENT_COMPLETED","agent_id":"mitaka","message":"Agent mitaka completed"}
data: {"type":"AGENT_COMPLETED","agent_id":"kichijoji","message":"Agent kichijoji completed"}
data: {"type":"LEAD_DECISION","agent_id":"swarm-lead","message":"All tasks complete, finalizing"}
data: {"type":"WORKFLOW_COMPLETED","agent_id":"swarm-supervisor","message":"All done"}
11
結果の取得
12
curl "http://localhost:8080/api/v1/tasks/task-abc123..."
13
レスポンス:
14
{
  "task_id": "task-abc123...",
  "status": "TASK_STATUS_COMPLETED",
  "result": "## AI Chip Market Comparison\n\n### United States\nThe US market is dominated by NVIDIA...\n\n### Japan\nJapan focuses on edge AI...\n\n### South Korea\nSouth Korea leverages Samsung...",
  "metadata": {
    "workflow_type": "swarm",
    "total_agents": 3,
    "total_tokens": 598235,
    "model_breakdown": [
      {
        "model": "claude-haiku-4-5-20251001",
        "provider": "anthropic",
        "executions": 38,
        "tokens": 297524,
        "cost_usd": 0.372
      },
      {
        "model": "shannon_web_search",
        "provider": "shannon-scraper",
        "executions": 12,
        "tokens": 90000,
        "cost_usd": 0.048
      }
    ]
  },
  "usage": {
    "input_tokens": 289164,
    "output_tokens": 309071,
    "total_tokens": 598235,
    "estimated_cost": 0.780
  }
}

送信 + ストリーミングを一回で

フロントエンドアプリケーションでは、送信とストリーミングを統合するエンドポイントを使用します:
curl -s -X POST http://localhost:8080/api/v1/tasks/stream \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Analyze the competitive landscape of cloud AI platforms: AWS, Azure, and GCP",
    "context": { "force_swarm": true }
  }' | jq
レスポンス:
{
  "workflow_id": "task-def456...",
  "task_id": "task-def456...",
  "stream_url": "/api/v1/stream/sse?workflow_id=task-def456..."
}
ストリームURLに接続してリアルタイムイベントを取得します:
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task-def456..."

Python SDK

基本的な使用方法

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Swarmタスクを送信
handle = client.submit_task(
    "Compare AI chip markets across US, Japan, and South Korea",
    force_swarm=True,
    session_id="swarm-demo-001",
)

# 完了を待って結果を取得
result = client.wait(handle.task_id)
print(result.result)
client.close()

ストリーミング付き

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 送信とストリームURLの取得を一度に
handle, stream_url = client.submit_and_stream(
    "Analyze the competitive landscape of cloud AI platforms",
    force_swarm=True,
)

# リアルタイムでイベントをストリーミング
for event in client.stream(handle.workflow_id):
    if event.type == "AGENT_STARTED":
        print(f"Agent開始: {event.agent_id}")
    elif event.type == "LEAD_DECISION":
        print(f"Lead決定: {event.message}")
    elif event.type == "PROGRESS":
        print(f"進捗: {event.message}")
    elif event.type == "AGENT_COMPLETED":
        print(f"Agent完了: {event.agent_id}")
    elif event.type == "WORKFLOW_COMPLETED":
        print("Swarm Workflow完了")
        break

# 最終結果を取得
result = client.get_status(handle.task_id)
print(result.result)
client.close()

カスタムコンテキスト

handle = client.submit_task(
    "Research renewable energy policies in the EU, US, and China",
    force_swarm=True,
    context={
        "model_tier": "medium",  # Agentにmediumティアのモデルを使用
    },
)

Agentの協調方法

Lead Agentによる調整

Lead Agentはイベント駆動のコーディネーターとして機能します。タスクを直接実行するのではなく、イベントに基づいて計画、割り当て、作業の再割り当てを行います:
  • Agentがidleになると、Leadは依存関係が満たされた保留中のタスクを確認し、次のタスクを割り当て
  • Agentが完了すると、Leadは再割り当て、シャットダウン、または計画の修正を評価
  • 定期的なチェックポイント(120秒ごと)で、Leadは全体の進捗をレビューし、計画を調整可能
  • アイドルAgentがなく、アクション可能な保留タスクもない場合、Leadは不要なLLM呼び出しをスキップ

チームロスター

各AgentはすべてのAgentとその担当を示すチームロスターを受け取ります。これにより、特定の情報について誰に連絡すべきかを把握できます:
## Your Team (shared session workspace)
- **takao (you)**: "Research US AI chip market"
- mitaka: "Research Japan AI chip market"
- kichijoji: "Research South Korea AI chip market"

発見の公開

Agentは共有ワークスペースを通じて発見を共有します。これらはすべてのAgentのプロンプトコンテキストに表示されます:
## Shared Findings
- takao: NVIDIA dominates US with 80% market share...
- mitaka: Japan focuses on edge AI chips with Preferred Networks leading...

ダイレクトメッセージの送信

Agentは特定のチームメイトにダイレクトメッセージを送信できます:
## Inbox Messages
- From mitaka (info): {"message": "Check Samsung's foundry plans for AI chips"}

ヘルプの要請

Agentが追加サポートを必要とする場合、Lead Agentにヘルプを要請します:
{"action": "request_help", "help_description": "Need help analyzing EU regulatory impact on AI chips", "help_skills": ["web_search"]}
Lead Agentはリクエストを評価し、新しいAgentを生成するか、既存のアイドルAgentを再割り当てするか、サブタスクを保留タスクキューに追加します。

設定

features.yaml

workflows:
  swarm:
    enabled: true
    max_agents: 10                    # Agent総数の上限(初期 + 動的)
    max_iterations_per_agent: 25      # Agentごとの推論-行動ループの最大回数
    agent_timeout_seconds: 1800       # Agentごとのタイムアウト(30分)
    max_messages_per_agent: 20        # AgentごとのP2Pメッセージ上限
    workspace_snippet_chars: 800      # プロンプト内のワークスペースエントリの最大文字数
    workspace_max_entries: 5          # Agentに表示される最近のエントリ数
    max_total_llm_calls: 200          # グローバルLLM呼び出し予算
    max_total_tokens: 1000000         # グローバルトークン予算 (1M)
    max_wall_clock_minutes: 30        # 最大ウォールクロック時間

設定パラメータ

パラメータデフォルト範囲説明
enabledtruetrue/falseSwarm Workflowの有効/無効
max_agents101-50動的に生成されたAgentを含むAgent総数の上限
max_iterations_per_agent251-100Agentごとの推論-行動サイクルの最大回数
agent_timeout_seconds180060-7200Agentごとのウォールクロックタイムアウト
max_messages_per_agent201-100Agentが送信できるP2Pメッセージの上限
workspace_snippet_chars800100-4000Agentプロンプト内のワークスペースエントリの切り詰め制限
workspace_max_entries51-20トピックごとに表示される最近のワークスペースエントリ数
max_total_llm_calls20010-1000Swarm内のすべてのAgentにわたる最大LLM呼び出し回数
max_total_tokens100000010000-10000000すべてのAgentが消費する最大トークン数
max_wall_clock_minutes301-120Swarm実行全体の最大ウォールクロック時間

実際のユースケース

コラボレーティブコーディング

Agentが協調してコードレビュー、実装、テストを実行。サンドボックス実行対応。

金融分析

ブル/ベアアナリスト、センチメントAgent、ポートフォリオマネージャーが投資インサイトを統合。

データ処理

サンドボックスPython実行、JSONクエリ、統計分析による並列データパイプライン。

競合インテリジェンス

競合のWebサイト、価格、SNSを同時監視し、発見を自動的にクロス共有。

例: コラボレーティブコードレビュー

curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Review the Python files in /workspace/src for security vulnerabilities, code quality issues, and missing test coverage. Write fixes and add tests.",
    "context": { "force_swarm": true }
  }'
Lead Agentは各関心事(セキュリティ監査、コード品質、テストカバレッジ)のタスクを作成し、developerロールのAgentを割り当て、すべてのレビュー完了に依存する最終統合タスクを作成します。

例: マルチサイト価格モニタリング

curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Compare pricing and features of AWS, Azure, and GCP for a startup needing GPU instances, object storage, and managed Kubernetes",
    "context": {
      "force_swarm": true,
      "model_tier": "medium"
    }
  }'

レスポンスメタデータの理解

Swarm Workflowはモデルごとの実行内訳とトークン使用量を含むメタデータを返します:
{
  "metadata": {
    "workflow_type": "swarm",
    "total_agents": 3,
    "total_tokens": 598235,
    "model_breakdown": [
      {
        "model": "claude-haiku-4-5-20251001",
        "provider": "anthropic",
        "executions": 38,
        "tokens": 297524,
        "cost_usd": 0.372
      },
      {
        "model": "shannon_web_search",
        "provider": "shannon-scraper",
        "executions": 12,
        "tokens": 90000,
        "cost_usd": 0.048
      }
    ]
  },
  "usage": {
    "input_tokens": 289164,
    "output_tokens": 309071,
    "total_tokens": 598235,
    "estimated_cost": 0.780
  }
}
フィールド説明
metadata.workflow_typeSwarm Workflowでは常に"swarm"
metadata.total_agents参加したAgent総数
metadata.total_tokensSwarm全体で消費された総トークン数
metadata.model_breakdown[]モデルごとの実行サマリー
metadata.model_breakdown[].modelモデル識別子
metadata.model_breakdown[].providerプロバイダー名
metadata.model_breakdown[].executionsこのモデルのLLM呼び出し回数
metadata.model_breakdown[].tokensこのモデルが消費したトークン数
metadata.model_breakdown[].cost_usd推定コスト(USD)
usage.input_tokens全Agentの入力トークン総数
usage.output_tokens全Agentの出力トークン総数
usage.total_tokensトークン総数(入力 + 出力)
usage.estimated_cost合計推定コスト(USD)

ヒントとベストプラクティス

  • context.force_swarm: trueを設定してSwarmWorkflowにルーティング
  • デフォルト設定から始め、結果に基づいて調整
  • SSEイベントでLead Agentの決定とAgentの動作を監視
  • セッション(session_id)を使用してマルチターンのSwarm会話を実現
  • LEAD_DECISIONイベントに注目して調整ロジックを理解

トラブルシューティング

一般的な問題:
  • Swarmがトリガーされない: force_swarm: truecontextオブジェクト内にあること、features.yamlでSwarmが有効であることを確認
  • Agentがタイムアウト: 複雑なタスクではagent_timeout_secondsを増やす(デフォルトは1800秒 / 30分)
  • Agentが多すぎる: クエリを簡略化してサブタスク数を減らすか、max_agentsを下げる
  • トークン使用量が高い: max_iterations_per_agentを下げるか、model_tier: "small"を使用するか、max_total_tokensを減らす
  • Agentがループに陥る: 収束検出(連続3回のツール未使用イテレーション)が自動的に検出する
  • 予算超過: max_total_llm_callsmax_total_tokensの設定を確認。Leadは予算が厳しくなるとグレースフルシャットダウンを試みる
  • 冗長な検索: 知識重複排除がこれを処理するはず。継続する場合、Agentが共有ワークスペースにアクセスできるか確認

フォールバック動作

Swarm Workflowが失敗した場合(計画エラー、すべてのAgentが失敗など)、Shannonは標準のDAG/Supervisor Workflowルーティングに自動的にフォールバックします。再帰的な失敗を防ぐため、force_swarmフラグはコンテキストから除去されます。

次のステップ

Swarmコンセプト

Swarmアーキテクチャの詳細

ディープリサーチ

引用付きのマルチステージリサーチ

APIリファレンス

完全なAPIドキュメント