メインコンテンツへスキップ

概要

Shannonはリアルタイムイベントストリーミングを提供し、タスク実行をリアルタイムで監視できます。これは以下に不可欠です:
  • ユーザーへのライブフィードバック提供
  • Workflow実行のデバッグ
  • インタラクティブUIの構築
  • 長時間実行タスクの監視

ストリーミング技術

Shannonは2つのストリーミングプロトコルをサポート:
プロトコルユースケース機能
SSE (Server-Sent Events)一方向 server→clientシンプル、HTTPベース、自動再接続
WebSocket双方向全二重、低レイテンシ
ほとんどのユースケースでは、シンプルさと組み込みの再接続処理のためSSEが推奨されます。

Server-Sent Events (SSE)

cURLの使用

curl -N http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
出力:
event: WORKFLOW_STARTED
data: {"workflow_id":"task-123","agent_id":"orchestrator","message":"Task processing started"}

event: DATA_PROCESSING
data: {"workflow_id":"task-123","message":"Preparing context"}

event: PROGRESS
data: {"workflow_id":"task-123","agent_id":"planner","message":"Created a plan with 3 steps"}

event: AGENT_STARTED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Processing query"}

event: LLM_PROMPT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Searching for information..."}

event: TOOL_INVOKED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Calling web_search with query='topic A'"}

event: TOOL_OBSERVATION
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Found 5 results"}

event: LLM_OUTPUT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Research complete: Found relevant information"}

event: AGENT_COMPLETED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Task done"}

event: WORKFLOW_COMPLETED
data: {"workflow_id":"task-123","message":"Final synthesized result ready"}

Python SDKの使用

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# タスクを送信
handle = client.submit_task(
    query="Research AI trends and create summary"
)

# イベントをストリーミング
for event in client.stream(handle.workflow_id):
    print(f"[{event.type}] {event.message}")

    if event.type == "WORKFLOW_COMPLETED":
        print(f"Task completed: {event.message}")
        break

イベントタイプ

Shannonは異なるカテゴリにわたって26のイベントタイプを発行します。最も一般的に使用されるものは以下です:

コアWorkflowイベント

イベントタイプ説明メッセージ例
WORKFLOW_STARTEDタスク処理開始"Task processing started"
WORKFLOW_COMPLETEDWorkflow正常終了"All done"
AGENT_STARTEDAgent処理開始"Processing query"
AGENT_COMPLETEDAgent終了"Task done"
STATUS_UPDATEステータス更新/進捗"Planning next step"
ERROR_OCCURRED実行中のエラー"Failed to connect: timeout"

LLM & ツールイベント

イベントタイプ説明メッセージ例
LLM_PROMPTLLMに送信されたプロンプト"What is 5 + 5?"
LLM_OUTPUT完全なLLMレスポンス"5 + 5 equals 10"
LLM_PARTIALストリーミングチャンク(フィルタされることが多い)"5 + 5"
TOOL_INVOKEDツール実行開始"Calling database_query"
TOOL_OBSERVATIONツール結果/出力"Query returned 42 rows"

進捗 & ステータスイベント

イベントタイプ説明メッセージ例
PROGRESSステップ完了更新"Created a plan with 3 steps"
DATA_PROCESSINGデータ処理/分析中"Preparing context"
DELEGATION別のAgentへのタスク委任"Handing off to simple task"

マルチAgentイベント(機能ゲートが必要)

イベントタイプ説明メッセージ例
MESSAGE_SENTAgentがメッセージを送信(p2p_v1が必要)"Please analyze section 3"
MESSAGE_RECEIVEDAgentがメッセージを受信(p2p_v1が必要)"Received task"
TEAM_RECRUITEDAgentが募集(dynamic_team_v1が必要)"Summarize section 3"
TEAM_RETIREDAgentが引退(dynamic_team_v1が必要)"Task completed"
BUDGET_THRESHOLDトークン予算しきい値到達"Budget threshold: 80%"
全27イベントタイプの完全なリストについては、Event Types Referenceを参照してください。

WebSocketストリーミング

WebSocket経由で接続

import asyncio
import websockets
import json

async def stream_task():
    uri = f"ws://localhost:8080/api/v1/stream/ws?workflow_id={workflow_id}"

    # ヘッダー経由でAPIキーを渡す(Gatewayはヘッダーベース認証が必要)
    async with websockets.connect(
        uri, extra_headers={'X-API-Key': 'sk_test_123456'}
    ) as websocket:
        while True:
            message = await websocket.recv()
            event = json.loads(message)
            print(f"Event: {event['type']}")

            if event['type'] == 'WORKFLOW_COMPLETED':
                break

asyncio.run(stream_task())
WebSocketストリーミングは現在サーバーからクライアントへの一方向のみです。タスクをキャンセルするにはREST API /api/v1/tasks/{id}/cancelを使用してください。

イベントのフィルタリング

ノイズを減らすためにタイプでイベントをフィルタリング:
# 重要なイベントのみを表示
for event in client.stream(workflow_id):
    if event.type in ['PROGRESS', 'AGENT_COMPLETED', 'WORKFLOW_COMPLETED']:
        print(f"{event.type}: {event.message}")

進捗追跡

イベントからタスクの進捗を計算:
def track_progress(workflow_id):
    agents_started = 0
    agents_completed = 0

    for event in client.stream(workflow_id):
        if event.type == 'PROGRESS':
            # プランナーからの進捗更新を追跡
            print(f"Progress: {event.message}")

        elif event.type == 'AGENT_STARTED':
            agents_started += 1
            print(f"Agent started: {event.agent_id}")

        elif event.type == 'AGENT_COMPLETED':
            agents_completed += 1
            if agents_started > 0:
                progress = (agents_completed / agents_started) * 100
                print(f"Progress: {progress:.1f}% ({agents_completed}/{agents_started})")

        elif event.type == 'WORKFLOW_COMPLETED':
            print("✅ Task complete!")
            break

track_progress(handle.workflow_id)
出力:
Progress: Created a plan with 3 steps
Agent started: research-agent
Agent started: analysis-agent
Agent started: writer-agent
Progress: 33.3% (1/3)
Progress: 66.7% (2/3)
Progress: 100.0% (3/3)
✅ Task complete!

リアルタイムUI例

Reactコンポーネント

import { useEffect, useState } from 'react';

function TaskMonitor({ taskId, apiKey }) {
  const [events, setEvents] = useState([]);
  const [progress, setProgress] = useState(0);

  useEffect(() => {
    const params = new URLSearchParams({ workflow_id: taskId });

    // 注意: ブラウザのEventSourceはカスタムヘッダーをサポートしていません。
    // 本番環境: 認証ヘッダーを注入するバックエンドからSSEを開始するか、
    // 開発環境でGATEWAY_SKIP_AUTH=1を使用してください。

    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/stream/sse?${params}`
    );

    const handleEvent = (e) => {
      const event = JSON.parse(e.data);
      setEvents(prev => [...prev, event]);

      // Workflowライフサイクルに基づいて進捗を更新
      if (event.type === 'WORKFLOW_STARTED') {
        setProgress(10);
      } else if (event.type === 'PROGRESS') {
        setProgress(prev => Math.min(prev + 20, 80));
      } else if (event.type === 'WORKFLOW_COMPLETED') {
        setProgress(100);
      }
    };

    // 特定のイベントタイプをリッスン(Shannonは名前付きSSEイベントを使用)
    const eventTypes = [
      'WORKFLOW_STARTED',
      'WORKFLOW_COMPLETED',
      'AGENT_STARTED',
      'AGENT_COMPLETED',
      'STATUS_UPDATE',
      'PROGRESS',
      'ERROR_OCCURRED',
      'LLM_OUTPUT',
      'TOOL_INVOKED',
      'TOOL_OBSERVATION'
    ];

    eventTypes.forEach(type => {
      eventSource.addEventListener(type, handleEvent);
    });

    // 名前なしイベントのフォールバック
    eventSource.onmessage = handleEvent;

    eventSource.onerror = () => {
      console.error('SSE connection error');
    };

    return () => {
      eventTypes.forEach(type => {
        eventSource.removeEventListener(type, handleEvent);
      });
      eventSource.close();
    };
  }, [taskId, apiKey]);

  return (
    <div>
      <progress value={progress} max={100} />
      <ul>
        {events.map((e, i) => (
          <li key={i}>
            <strong>[{e.type}]</strong>{' '}
            {e.agent_id && <span>({e.agent_id})</span>}{' '}
            {e.message}
          </li>
        ))}
      </ul>
    </div>
  );
}
重要な実装詳細:
  • Shannonは名前付きSSEイベントを発行します(例: event: AGENT_STARTED)。各イベントタイプに対してaddEventListener()を使用する必要があります
  • ブラウザのEventSource APIはカスタムヘッダーをサポートしていません。クエリパラメータ経由でAPIキーを渡さないでください。ヘッダーを注入するバックエンドプロキシを使用するか、開発環境で認証を無効にしてください
  • メモリリークを防ぐため、クリーンアップ関数で常にイベントリスナーをクリーンアップしてください

エラーハンドリング

接続障害を適切に処理:
from shannon import ShannonClient
from shannon.errors import ShannonError, ConnectionError
import time

def robust_streaming(workflow_id, max_retries=3):
    client = ShannonClient(base_url="http://localhost:8080")

    for attempt in range(max_retries):
        try:
            for event in client.stream(workflow_id):
                print(f"Event: {event.type}")

                if event.type == 'WORKFLOW_COMPLETED':
                    # 最終ステータスを取得して結果を取得
                    status = client.get_status(workflow_id.replace('wf-', 'task-'))
                    return status.result

        except (ShannonError, ConnectionError) as e:
            print(f"Stream error (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 指数バックオフ
            else:
                raise

ストリーミングでの認証

認証が有効な場合、APIキーを提供:

APIキー付きSSE

curl -N \
  -H "X-API-Key: sk_test_123456" \
  http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}

APIキー付きPython SDK

client = ShannonClient(
    base_url="http://localhost:8080",
    api_key="sk_test_123456"
)

# APIキーは自動的にすべてのリクエストに含まれます
for event in client.stream(workflow_id):
    print(event)

パフォーマンスの考慮事項

イベントバッファリング

Shannonはクライアントを圧倒しないようにイベントをバッファリングします:
# config/shannon.yaml
streaming:
  buffer_size: 100  # 最大キューイベント
  flush_interval_ms: 100  # 100msごとにバッチを送信

キープアライブ

SSEはタイムアウトを防ぐために15秒ごとにキープアライブコメントを送信:
: keepalive
: keepalive
event: LLM_PROMPT
data: {"message":"..."}

リソースクリーンアップ

完了時には常にストリームを閉じる:
# イベントを反復処理(接続を自動的に処理)
for event in client.stream(workflow_id):
    if event.type == 'WORKFLOW_COMPLETED':
        break
# 明示的なクローズは不要

ベストプラクティス

1. シンプルな監視にはSSEを使用

# ✅ 良い: シンプルなSSEストリーミング
for event in client.stream(workflow_id):
    print(event.type)

2. 切断を処理

# ✅ 良い: リトライロジック
for attempt in range(3):
    try:
        for event in client.stream(workflow_id):
            process_event(event)
        break
    except (ShannonError, ConnectionError):
        if attempt == 2:
            raise
        time.sleep(2)

3. 不要なイベントをフィルタリング

# ✅ 良い: 重要なイベントのみ
critical_events = ['PROGRESS', 'WORKFLOW_COMPLETED', 'ERROR_OCCURRED']
for event in client.stream(workflow_id):
    if event.type in critical_events:
        handle_event(event)

4. タイムアウトを設定

# ✅ 良い: タイムアウト保護
import signal

def timeout_handler(signum, frame):
    raise TimeoutError("Stream timeout")

signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300)  # 5分タイムアウト

try:
    for event in client.stream(workflow_id):
        print(event)
finally:
    signal.alarm(0)  # アラームをキャンセル

次のステップ