メインコンテンツへスキップ

Swarmマルチエージェントワークフロー

このチュートリアルでは、ShannonのSwarmWorkflowを使用して、Agent間メッセージングと共有ワークスペースを備えた永続的で協調するAgentをデプロイする方法を紹介します。

学べること

  • APIとPython SDKでSwarmタスクを送信する方法
  • Agentがどのように分解、実行、結果統合を行うか
  • SSEストリーミングでAgentの進捗を監視する方法
  • Swarmパラメータの設定方法
  • 実際のユースケースとベストプラクティス

前提条件

  • Shannonスタックが稼働中(Docker Compose)
  • http://localhost:8080でゲートウェイにアクセス可能
  • config/features.yamlでSwarmが有効(デフォルトで有効)
  • 認証のデフォルト:
    • Docker Compose: 認証はデフォルトで無効(GATEWAY_SKIP_AUTH=1)。
    • ローカルビルド: 認証はデフォルトで有効。認証を無効にするにはGATEWAY_SKIP_AUTH=1を設定するか、APIキーヘッダー-H "X-API-Key: $API_KEY"を含めます。

クイックスタート

1
Swarmタスクの送信
2
コンテキストにforce_swarm: trueを設定してSwarmWorkflowにルーティングします:
3
curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Compare AI chip markets across US, Japan, and South Korea",
    "session_id": "swarm-demo-001",
    "context": {
      "force_swarm": true
    }
  }'
4
レスポンス:
5
{
  "task_id": "task-abc123...",
  "status": "STATUS_CODE_OK",
  "message": "Task submitted successfully",
  "created_at": "2025-11-10T10:00:00Z"
}
6
進捗イベントのストリーミング
7
SSEストリームに接続してAgentの作業をリアルタイムで監視します:
8
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task-abc123..."
9
以下のようなイベントが表示されます:
10
data: {"type":"WORKFLOW_STARTED","agent_id":"swarm-supervisor","message":"Assigning a team of agents"}
data: {"type":"PROGRESS","agent_id":"swarm-supervisor","message":"Planning approach"}
data: {"type":"PROGRESS","agent_id":"swarm-supervisor","message":"Assigning 3 agents"}
data: {"type":"AGENT_STARTED","agent_id":"takao","message":"Agent takao started"}
data: {"type":"AGENT_STARTED","agent_id":"mitaka","message":"Agent mitaka started"}
data: {"type":"AGENT_STARTED","agent_id":"kichijoji","message":"Agent kichijoji started"}
data: {"type":"PROGRESS","agent_id":"takao","message":"Agent takao progress: iteration 1/25, action: tool_call"}
data: {"type":"AGENT_COMPLETED","agent_id":"takao","message":"Agent takao completed"}
data: {"type":"AGENT_COMPLETED","agent_id":"mitaka","message":"Agent mitaka completed"}
data: {"type":"AGENT_COMPLETED","agent_id":"kichijoji","message":"Agent kichijoji completed"}
data: {"type":"PROGRESS","agent_id":"swarm-supervisor","message":"Combining findings from 3 agents"}
data: {"type":"WORKFLOW_COMPLETED","agent_id":"swarm-supervisor","message":"All done"}
11
結果の取得
12
curl "http://localhost:8080/api/v1/tasks/task-abc123..."
13
レスポンス:
14
{
  "task_id": "task-abc123...",
  "status": "TASK_STATUS_COMPLETED",
  "result": "## AI Chip Market Comparison\n\n### United States\nThe US market is dominated by NVIDIA...\n\n### Japan\nJapan focuses on edge AI...\n\n### South Korea\nSouth Korea leverages Samsung...",
  "metadata": {
    "workflow_type": "swarm",
    "total_agents": 3,
    "agents": [
      {"agent_id": "takao", "iterations": 8, "tokens": 12400, "success": true, "model": "gpt-5-mini-2025-08-07"},
      {"agent_id": "mitaka", "iterations": 6, "tokens": 9800, "success": true, "model": "gpt-5-mini-2025-08-07"},
      {"agent_id": "kichijoji", "iterations": 7, "tokens": 11200, "success": true, "model": "gpt-5-mini-2025-08-07"}
    ]
  }
}

送信 + ストリーミングを一回で

フロントエンドアプリケーションでは、送信とストリーミングを統合するエンドポイントを使用します:
curl -s -X POST http://localhost:8080/api/v1/tasks/stream \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Analyze the competitive landscape of cloud AI platforms: AWS, Azure, and GCP",
    "context": { "force_swarm": true }
  }' | jq
レスポンス:
{
  "workflow_id": "task-def456...",
  "task_id": "task-def456...",
  "stream_url": "/api/v1/stream/sse?workflow_id=task-def456..."
}
ストリームURLに接続してリアルタイムイベントを取得します:
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task-def456..."

Python SDK

基本的な使用方法

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Swarmタスクを送信
handle = client.submit_task(
    "Compare AI chip markets across US, Japan, and South Korea",
    force_swarm=True,
    session_id="swarm-demo-001",
)

# 完了を待って結果を取得
result = client.wait(handle.task_id)
print(result.result)
client.close()

ストリーミング付き

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 送信とストリームURLの取得を一度に
handle, stream_url = client.submit_and_stream(
    "Analyze the competitive landscape of cloud AI platforms",
    force_swarm=True,
)

# リアルタイムでイベントをストリーミング
for event in client.stream(handle.workflow_id):
    if event.type == "AGENT_STARTED":
        print(f"Agent開始: {event.agent_id}")
    elif event.type == "PROGRESS":
        print(f"進捗: {event.message}")
    elif event.type == "AGENT_COMPLETED":
        print(f"Agent完了: {event.agent_id}")
    elif event.type == "WORKFLOW_COMPLETED":
        print("Swarm Workflow完了")
        break

# 最終結果を取得
result = client.get_status(handle.task_id)
print(result.result)
client.close()

カスタムコンテキスト

handle = client.submit_task(
    "Research renewable energy policies in the EU, US, and China",
    force_swarm=True,
    context={
        "model_tier": "medium",  # Agentにmediumティアのモデルを使用
    },
)

Agentの協調方法

チームロスター

各AgentはすべてのAgentとその担当を示すチームロスターを受け取ります。これにより、特定の情報について誰に連絡すべきかを把握できます:
## Your Team (shared session workspace)
- **takao (you)**: "Research US AI chip market"
- mitaka: "Research Japan AI chip market"
- kichijoji: "Research South Korea AI chip market"

発見の公開

Agentは共有ワークスペースを通じて発見を共有します。これらはすべてのAgentのプロンプトコンテキストに表示されます:
## Shared Findings
- takao: NVIDIA dominates US with 80% market share...
- mitaka: Japan focuses on edge AI chips with Preferred Networks leading...

ダイレクトメッセージの送信

Agentは特定のチームメイトにダイレクトメッセージを送信できます:
## Inbox Messages
- From mitaka (info): {"message": "Check Samsung's foundry plans for AI chips"}

ヘルプの要請

Agentが追加サポートを必要とする場合、スーパーバイザーにヘルプを要請します:
{"action": "request_help", "help_description": "Need help analyzing EU regulatory impact on AI chips", "help_skills": ["web_search"]}
スーパーバイザーが新しいAgentを生成してリクエストを処理し、元のAgentに通知します。

設定

features.yaml

workflows:
  swarm:
    enabled: true
    max_agents: 10                    # Agent総数の上限(初期 + 動的)
    max_iterations_per_agent: 25      # Agentごとの推論-行動ループの最大回数
    agent_timeout_seconds: 600        # Agentごとのタイムアウト(10分)
    max_messages_per_agent: 20        # AgentごとのP2Pメッセージ上限
    workspace_snippet_chars: 800      # プロンプト内のワークスペースエントリの最大文字数
    workspace_max_entries: 5          # Agentに表示される最近のエントリ数

設定パラメータ

パラメータデフォルト範囲説明
enabledtruetrue/falseSwarm Workflowの有効/無効
max_agents101-50動的に生成されたヘルパーを含むAgent総数の上限
max_iterations_per_agent251-100Agentごとの推論-行動サイクルの最大回数
agent_timeout_seconds60060-3600Agentごとのウォールクロックタイムアウト
max_messages_per_agent201-100Agentが送信できるP2Pメッセージの上限
workspace_snippet_chars800100-4000Agentプロンプト内のワークスペースエントリの切り詰め制限
workspace_max_entries51-20トピックごとに表示される最近のワークスペースエントリ数

実際のユースケース

市場調査

Agentを異なる市場や競合の調査に並列で割り当て、統一レポートに統合します。

技術比較

各Agentが異なるフレームワーク、ライブラリ、アーキテクチャを評価し、発見をリアルタイムで共有します。

マルチリージョン分析

Agentが異なる地域を同時に調査し、発見をクロスリージョンで共有します。

文献レビュー

Agentが研究テーマの異なる側面を探索し、ワークスペースでソースと主要な発見を共有します。

例: 技術フレームワーク比較

curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Compare LangChain, CrewAI, and AutoGen for building multi-agent systems. Evaluate architecture, community, performance, and production readiness.",
    "context": { "force_swarm": true }
  }'
このタスクは各フレームワークを調査するAgentに分解され、ワークスペースで発見を共有するため、各Agentが他のAgentの発見を参照できます。

例: 包括的な業界分析

curl -X POST http://localhost:8080/api/v1/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Analyze the global electric vehicle battery supply chain: raw materials sourcing, manufacturing capacity, technology trends, and geopolitical risks",
    "context": {
      "force_swarm": true,
      "model_tier": "medium"
    }
  }'

レスポンスメタデータの理解

Swarm WorkflowはAgentごとの詳細を含むメタデータを返します:
{
  "metadata": {
    "workflow_type": "swarm",
    "total_agents": 4,
    "agents": [
      {
        "agent_id": "takao",
        "iterations": 8,
        "tokens": 14200,
        "success": true,
        "model": "gpt-5-mini-2025-08-07"
      },
      {
        "agent_id": "mitaka",
        "iterations": 6,
        "tokens": 9800,
        "success": true,
        "model": "gpt-5-mini-2025-08-07"
      }
    ]
  }
}
フィールド説明
workflow_typeSwarm Workflowでは常に"swarm"
total_agents参加したAgent数
agents[].agent_idAgentの一意の名前
agents[].iterations完了した推論-行動サイクル数
agents[].tokensこのAgentが消費した総トークン数
agents[].successAgentが正常に完了したかどうか
agents[].modelこのAgentが使用したLLMモデル

ヒントとベストプラクティス

  • context.force_swarm: trueを設定してSwarmWorkflowにルーティング
  • デフォルト設定から始め、結果に基づいて調整
  • SSEイベントでAgentの動作を監視
  • セッション(session_id)を使用してマルチターンのSwarm会話を実現

トラブルシューティング

一般的な問題:
  • Swarmがトリガーされない: force_swarm: truecontextオブジェクト内にあること、features.yamlでSwarmが有効であることを確認
  • Agentがタイムアウト: 複雑なタスクではagent_timeout_secondsを増やす
  • Agentが多すぎる: クエリを簡略化してサブタスク数を減らすか、max_agentsを下げる
  • トークン使用量が高い: max_iterations_per_agentを下げるかmodel_tier: "small"を使用
  • Agentがループに陥る: 収束検出(連続3回のツール未使用イテレーション)が自動的に検出する

フォールバック動作

Swarm Workflowが失敗した場合(分解エラー、すべてのAgentが失敗など)、Shannonは標準のDAG/Supervisor Workflowルーティングに自動的にフォールバックします。再帰的な失敗を防ぐため、force_swarmフラグはコンテキストから除去されます。

次のステップ