メインコンテンツへスキップ

概要

Shannonは、Server-Sent Events (SSE)およびWebSocketプロトコルを通じてリアルタイムのイベントストリーミングを提供します。ストリーミングを使用して、タスクの実行を監視し、進捗を表示し、生成される結果を受け取ります。
認証: ストリーミングエンドポイントは、他のAPIと同じヘッダーを必要とします。 ブラウザはEventSourceでカスタムヘッダーを送信できません。
  • 開発: GATEWAY_SKIP_AUTH=1を設定します。
  • 本番: バックエンドを介してSSEをプロキシし、X-API-KeyまたはBearerヘッダーを注入します。 APIキーをURLクエリパラメータで渡さないでください。
ストリーミング制限:
  • タイムアウト: 5分間の非アクティブ後にストリームは自動的に閉じます
  • バッファサイズ: 接続ごとに最大1MBのバッファデータ
  • 使用メタデータ: すべてのLLMプロバイダー(OpenAI、Anthropic、Google、Groq、xAI)に対してトークン数とコストが利用可能になりました

エンドポイント

メソッドエンドポイントプロトコル説明
POST/api/v1/tasks/streamHTTP+SSEタスクを送信し、ストリームURLを取得(推奨)
GET/api/v1/stream/sseSSEServer-Sent Eventsエンドポイント
GET/api/v1/stream/wsWebSocketWebSocketストリーミングエンドポイント
GET/api/v1/tasks/{id}/eventsHTTP過去のイベントを取得(ページネーションあり)

統一された送信 + ストリーム(推奨)

POST /api/v1/tasks/stream

タスクを送信し、そのイベントのストリーミングを即座に開始する最も簡単な方法です。このエンドポイントは、タスクの送信とストリーミングの設定を1回の呼び出しで組み合わせています。
フロントエンドアプリケーションに最適: このエンドポイントは、タスクを送信した直後に進捗を表示したいリアルタイムUIに最適です。

認証

必要: はい
X-API-Key: sk_test_123456
または:
Authorization: Bearer YOUR_TOKEN

リクエストボディ

パラメータタイプ必須説明
querystringはい自然言語のタスク説明
session_idstringいいえマルチターン会話のためのセッション識別子
contextobjectいいえキーと値のペアとしての追加コンテキストデータ
model_tierstringいいえ希望するティア: small, medium, または large
model_overridestringいいえ特定のモデル名(標準; 例: gpt-5
provider_overridestringいいえプロバイダーを強制指定(例: openai, anthropic, google

レスポンス

ステータス: 201 Created ボディ:
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "workflow_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "stream_url": "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}

レスポンスフィールド

フィールドタイプ説明
workflow_idstringタスク/ワークフロー識別子
stream_urlstringSSEストリームエンドポイントへの相対URL

例: JavaScript/TypeScript

async function submitAndStream(query, onEvent, onComplete, onError) {
  try {
    // 1. タスクを送信し、ストリームURLを取得
    const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer YOUR_TOKEN'
      },
      body: JSON.stringify({
        query: query,
        session_id: `session-${Date.now()}`
      })
    });

    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }

    const { task_id, workflow_id, stream_url } = await response.json();
    console.log(`タスクが送信されました: ${workflow_id}`);

    // 2. SSEストリームに接続
    const eventSource = new EventSource(
      `http://localhost:8080${stream_url}`,
      { withCredentials: false }
    );

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      onEvent(event);

      // 完了を確認
      if (event.type === 'WORKFLOW_COMPLETED') {
        eventSource.close();
        onComplete(event);
      }
    };

    eventSource.onerror = (err) => {
      console.error('SSEエラー:', err);
      eventSource.close();
      onError(err);
    };

    return { workflow_id, eventSource };

  } catch (error) {
    onError(error);
    throw error;
  }
}

// 使用例
submitAndStream(
  "Q4の収益トレンドを分析する",
  (event) => console.log(`[${event.type}]`, event.message),
  (final) => console.log("完了:", final.result),
  (error) => console.error("エラー:", error)
);

例: React Hook

import { useState, useCallback, useRef } from 'react';

function useTaskStream(apiUrl = 'http://localhost:8080') {
  const [events, setEvents] = useState([]);
  const [isStreaming, setIsStreaming] = useState(false);
  const [error, setError] = useState(null);
  const [workflowId, setWorkflowId] = useState(null);
  const eventSourceRef = useRef(null);

  const submitTask = useCallback(async (query, sessionId = null) => {
    setEvents([]);
    setError(null);
    setIsStreaming(true);

    try {
      // タスクを送信
      const response = await fetch(`${apiUrl}/api/v1/tasks/stream`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${localStorage.getItem('token')}`
        },
        body: JSON.stringify({
          query,
          session_id: sessionId || `session-${Date.now()}`
        })
      });

      if (!response.ok) throw new Error(`HTTP ${response.status}`);

      const { task_id, workflow_id, stream_url } = await response.json();
      setWorkflowId(workflow_id);

      // ストリームに接続
      const eventSource = new EventSource(`${apiUrl}${stream_url}`);
      eventSourceRef.current = eventSource;

      eventSource.onmessage = (e) => {
        const event = JSON.parse(e.data);
        setEvents(prev => [...prev, event]);

        if (event.type === 'WORKFLOW_COMPLETED' || event.type === 'ERROR_OCCURRED') {
          eventSource.close();
          setIsStreaming(false);
        }
      };

      eventSource.onerror = (err) => {
        setError(err);
        setIsStreaming(false);
        eventSource.close();
      };

    } catch (err) {
      setError(err);
      setIsStreaming(false);
    }
  }, [apiUrl]);

  const stopStreaming = useCallback(() => {
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      setIsStreaming(false);
    }
  }, []);

  return { submitTask, stopStreaming, events, isStreaming, error, workflowId };
}

// コンポーネントでの使用例
function TaskStreamDemo() {
  const { submitTask, stopStreaming, events, isStreaming, error, workflowId } = useTaskStream();

  return (
    <div>
      <button
        onClick={() => submitTask("15 + 25 は何ですか?")}
        disabled={isStreaming}
      >
        {isStreaming ? '処理中...' : 'タスクを送信'}
      </button>

      {workflowId && <p>ワークフローID: {workflowId}</p>}
      {error && <p style={{color: 'red'}}>エラー: {error.message}</p>}

      <div>
        {events.map((event, idx) => (
          <div key={idx}>
            <strong>{event.type}</strong>: {event.message || event.agent_id}
          </div>
        ))}
      </div>

      {isStreaming && <button onClick={stopStreaming}>停止</button>}
    </div>
  );
}

例: Vue 3 Composition API

<script setup>
import { ref } from 'vue';

const events = ref([]);
const isStreaming = ref(false);
const error = ref(null);
const workflowId = ref(null);
let eventSource = null;

async function submitTask(query) {
  events.value = [];
  error.value = null;
  isStreaming.value = true;

  try {
    const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${localStorage.getItem('token')}`
      },
      body: JSON.stringify({
        query,
        session_id: `session-${Date.now()}`
      })
    });

    if (!response.ok) throw new Error(`HTTP ${response.status}`);

    const { task_id, workflow_id, stream_url } = await response.json();
    workflowId.value = workflow_id;

    eventSource = new EventSource(`http://localhost:8080${stream_url}`);

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      events.value.push(event);

      if (event.type === 'WORKFLOW_COMPLETED') {
        eventSource.close();
        isStreaming.value = false;
      }
    };

    eventSource.onerror = (err) => {
      error.value = err;
      eventSource.close();
      isStreaming.value = false;
    };
  } catch (err) {
    error.value = err;
    isStreaming.value = false;
  }
}

function stopStreaming() {
  if (eventSource) {
    eventSource.close();
    isStreaming.value = false;
  }
}
</script>

<template>
  <div>
    <button @click="submitTask('データを分析する')" :disabled="isStreaming">
      {{ isStreaming ? '処理中...' : 'タスクを送信' }}
    </button>

    <div v-if="workflowId">ワークフロー: {{ workflowId }}</div>
    <div v-if="error" style="color: red">エラー: {{ error.message }}</div>

    <div v-for="(event, idx) in events" :key="idx">
      <strong>{{ event.type }}</strong>: {{ event.message || event.agent_id }}
    </div>

    <button v-if="isStreaming" @click="stopStreaming">停止</button>
  </div>
</template>

例: Python

import httpx
import json

def submit_and_stream(query: str, api_key: str):
    """タスクを送信し、イベントをストリーミングします。"""

    # 1. タスクを送信
    response = httpx.post(
        "http://localhost:8080/api/v1/tasks/stream",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={
            "query": query,
            "session_id": f"session-{int(time.time())}"
        }
    )

    data = response.json()
    workflow_id = data["workflow_id"]
    stream_url = data["stream_url"]

    print(f"タスクが送信されました: {workflow_id}")

    # 2. イベントをストリーミング
    with httpx.stream(
        "GET",
        f"http://localhost:8080{stream_url}",
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=None
    ) as stream_response:
        for line in stream_response.iter_lines():
            if line.startswith("data:"):
                event = json.loads(line[5:])
                print(f"[{event['type']}] {event.get('message', '')}")

                if event['type'] in ['WORKFLOW_COMPLETED']:
                    return event

# 使用例
result = submit_and_stream("フランスの首都はどこですか?", "sk_test_123456")
print("最終結果:", result.get('result'))
このエンドポイントを使用する理由は? 統一されたエンドポイントにより、送信後すぐにストリーミングを開始でき、別々に送信して接続する際に発生する可能性のあるイベントの見逃しを防ぎます。

サーバー送信イベント (SSE)

GET /api/v1/stream/sse

サーバー送信イベントを使用したリアルタイムイベントストリーミング。

認証

必要: はい
X-API-Key: sk_test_123456

クエリパラメータ

パラメータ必須説明
workflow_idstringはいタスク/ワークフロー識別子
typesstringいいえフィルタリングするイベントタイプのカンマ区切りリスト
last_event_idstringいいえ特定のイベントIDから再開します。RedisストリームID(例: 1700000000000-0)または数値シーケンス(例: 42)を受け入れます。数値の場合、再生には seq > last_event_id のイベントが含まれます。

イベントフォーマット

各イベントはSSE仕様に従います:
id: <event_id>
event: <event_type>
data: <json_payload>

リクエストの例

curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \
  -H "X-API-Key: sk_test_123456"

レスポンスの例

id: 1
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","timestamp":"2025-10-22T10:30:00Z","message":"ワークフローが開始されました"}

id: 2
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","agent_id":"agent_1","message":"クエリを分析中...","timestamp":"2025-10-22T10:30:01Z"}

id: 3
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","tool":"web_search","params":{"query":"Pythonプログラミング"},"timestamp":"2025-10-22T10:30:02Z"}

id: 4
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","tool":"web_search","result":"...","timestamp":"2025-10-22T10:30:05Z"}

id: 5
event: AGENT_COMPLETED
data: {"workflow_id":"task_abc123","agent_id":"agent_1","result":"Pythonは高水準プログラミング言語です...","timestamp":"2025-10-22T10:30:06Z"}

id: 6
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","status":"COMPLETED","result":"...","timestamp":"2025-10-22T10:30:07Z"}

WebSocket

GET /api/v1/stream/ws

WebSocketを介した双方向ストリーミング。

認証

ゲートウェイは、ヘッダーのみ(X-API-KeyまたはAuthorization)を使用してWebSocket接続を認証します。ブラウザはWebSocketハンドシェイク中にカスタムヘッダーを設定できません。ブラウザでの使用方法:
  • GATEWAY_SKIP_AUTH=1でローカル実行するか、
  • ヘッダーを挿入してからゲートウェイに転送するリバースプロキシを使用します。
サーバー環境向けのヘッダーを使用した例: Node (ws):
import WebSocket from 'ws';

const ws = new WebSocket('ws://localhost:8080/api/v1/stream/ws', {
  headers: { 'X-API-Key': 'sk_test_123456' },
});

ws.on('open', () => {
  ws.send(JSON.stringify({ type: 'subscribe', workflow_id: 'task_abc123' }));
});

ws.on('message', (msg) => {
  const data = JSON.parse(msg.toString());
  console.log('Event:', data.type, data.message);
});

ws.on('error', (err) => console.error('WebSocket error:', err));
Python (websockets):
import asyncio, json, websockets

async def main():
    async with websockets.connect(
        'ws://localhost:8080/api/v1/stream/ws',
        extra_headers={'X-API-Key': 'sk_test_123456'},
    ) as ws:
        await ws.send(json.dumps({'type': 'subscribe', 'workflow_id': 'task_abc123'}))
        async for message in ws:
            data = json.loads(message)
            print('Event:', data.get('type'), data.get('message'))

asyncio.run(main())
接続後にクエリ文字列または「auth」メッセージを介してAPIキーを渡すことは、ゲートウェイではサポートされていません。

メッセージタイプ

クライアント → サーバー:
// ワークフローにサブスクライブ
{
  "type": "subscribe",
  "workflow_id": "task_abc123",
  "types": ["AGENT_THINKING", "TOOL_INVOKED"]
}

// サブスクリプション解除
{
  "type": "unsubscribe",
  "workflow_id": "task_abc123"
}

// Ping(キープアライブ)
{
  "type": "ping"
}
サーバー → クライアント:
// イベント
{
  "type": "AGENT_THINKING",
  "workflow_id": "task_abc123",
  "message": "クエリを分析中...",
  "timestamp": "2025-10-22T10:30:00Z"
}

// Pong(キープアライブ応答)
{
  "type": "pong"
}

OpenAI互換ストリーミングフォーマット

ShannonのSSEストリーミングは、OpenAI互換フォーマットを使用して使用メタデータとイベント構造を提供し、OpenAIベースのツールとのシームレスな統合を可能にしつつ、Shannonの包括的なイベント分類を維持します。

使用メタデータフォーマット

すべてのLLMプロバイダーは、OpenAIの標準構造で使用データを返します:
{
  "type": "LLM_OUTPUT",
  "data": {
    "output": "応答テキスト...",
    "model": "gpt-5-nano-2025-08-07",
    "provider": "openai",
    "usage": {
      "total_tokens": 350,
      "input_tokens": 200,
      "output_tokens": 150
    },
    "cost_usd": 0.0105
  }
}
互換性マトリックス:
プロバイダー使用メタデータストリームオプション
OpenAI✅ 完全サポートstream_options: {"include_usage": true}
Anthropic✅ 完全サポート最終ストリーミングチャンク
Google✅ 完全サポート最終ストリーミングチャンク
Groq✅ 完全サポート最終ストリーミングチャンク
xAI⚠️ 限定的単項モードでのみ利用可能
OpenAI互換⚠️ 優雅な劣化エンドポイントのサポートに依存
注意: Shannonは、stream_optionsパラメータをサポートしていないプロバイダーを優雅に処理します—ストリーミングは機能しますが、ストリーミング中に使用メタデータが利用できない場合があります。

イベントタイプ

コアイベント

イベントタイプ説明発生タイミング
WORKFLOW_STARTEDワークフローの実行が開始されたタスクの開始
WORKFLOW_COMPLETEDワークフローが正常に終了したタスクの終了(成功)

エージェントイベント

イベントタイプ説明ペイロードフィールド
AGENT_THINKINGエージェントの推論/計画agent_id, message
AGENT_COMPLETEDエージェントの実行が完了したagent_id, result

ツールイベント

イベントタイプ説明ペイロードフィールド
TOOL_INVOKEDツールの実行が開始されたtool, params
TOOL_OBSERVATIONエージェントがツールの結果を観察tool, result

LLMイベント

イベントタイプ説明ペイロードフィールド
LLM_PROMPTLLMに送信されたプロンプトtext
LLM_PARTIALストリーミングLLM出力text
LLM_OUTPUT最終LLM出力text

進捗およびシステムイベント

イベントタイプ説明ペイロードフィールド
PROGRESS進捗の更新progress, message
DATA_PROCESSINGデータ処理中message
WAITINGリソースを待機中message
ERROR_OCCURREDエラーが発生したerror, severity
ERROR_RECOVERYエラー回復の試みmessage
WORKSPACE_UPDATEDメモリ/コンテキストが更新されたmessage
BUDGET_THRESHOLD予算警告閾値に達したusage_percent, threshold_percent, tokens_used, tokens_budget, level, budget_type

ストリームライフサイクルイベント

イベントタイプ説明
STREAM_ENDストリームの明示的な終了信号(このワークフローに対してはこれ以上のイベントは発生しません)
SSE経由では、STREAM_ENDライフサイクルイベントはdoneという名前のSSEイベントとしてdata: [DONE](プレーンテキスト、JSONではない)で配信されます。WebSocket経由では、通常のJSONイベントとして"type": "STREAM_END"として現れます。

チームと承認

イベントタイプ説明
TEAM_RECRUITED複数エージェントのチームが編成されました
TEAM_RETIREDチームが解散しました
TEAM_STATUSチームの調整更新
ROLE_ASSIGNEDエージェントの役割が割り当てられました
DELEGATIONタスクが委任されました
DEPENDENCY_SATISFIED依存関係が解決されました
BUDGET_THRESHOLDトークン予算の閾値に達しました
APPROVAL_REQUESTED人間の承認が必要です
APPROVAL_DECISION承認の決定が記録されました

コード例

Python with httpx (SSE)

import httpx
import json

def stream_task_events(task_id: str, api_key: str):
    """SSEを使用してタスクイベントをストリームします。"""
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"

    with httpx.stream(
        "GET",
        url,
        headers={"X-API-Key": api_key},
        timeout=None  # ストリーミングのためタイムアウトなし
    ) as response:
        for line in response.iter_lines():
            if line.startswith("data:"):
                data = json.loads(line[5:])  # "data:"プレフィックスを削除
                yield data

# 使用例
for event in stream_task_events("task_abc123", "sk_test_123456"):
    print(f"[{event.get('type')}] {event.get('message', '')}")

    if event.get('type') == 'WORKFLOW_COMPLETED':
        print("最終結果:", event.get('result'))
        break

Python - イベントフィルタリング付きストリーム

def stream_filtered_events(task_id: str, api_key: str, event_types: list):
    """特定のイベントタイプのみをストリームします。"""
    types_param = ",".join(event_types)
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}&types={types_param}"

    with httpx.stream("GET", url, headers={"X-API-Key": api_key}) as response:
        for line in response.iter_lines():
            if line.startswith("data:"):
                yield json.loads(line[5:])

# エージェントの思考とツールイベントのみを受信
for event in stream_filtered_events(
    "task_abc123",
    "sk_test_123456",
    ["AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"]
):
    print(f"{event['type']}: {event.get('message', event.get('tool'))}")

JavaScript/Node.js (SSE)

const EventSource = require('eventsource');

function streamTaskEvents(taskId, apiKey) {
  const url = `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`;

  const eventSource = new EventSource(url, {
    headers: {
      'X-API-Key': apiKey
    }
  });

  eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log(`[${data.type}] ${data.message || ''}`);

    if (data.type === 'WORKFLOW_COMPLETED') {
      console.log('最終結果:', data.result);
      eventSource.close();
    }
  };

  eventSource.onerror = (error) => {
    console.error('SSEエラー:', error);
    eventSource.close();
  };

  return eventSource;
}

// 使用例
const stream = streamTaskEvents('task_abc123', 'sk_test_123456');

// 手動で閉じる
setTimeout(() => stream.close(), 60000);

JavaScript/Node.js - WebSocket (ws)

import WebSocket from 'ws';

function connectWebSocket(taskId, apiKey) {
  const ws = new WebSocket(
    `ws://localhost:8080/api/v1/stream/ws?workflow_id=${taskId}`,
    { headers: { 'X-API-Key': apiKey } }
  );

  ws.on('open', () => console.log('✓ 接続されました'));

  ws.on('message', (msg) => {
    const data = JSON.parse(msg.toString());
    switch (data.type) {
      case 'AGENT_THINKING':
        console.log(`💭 ${data.message}`);
        break;
      case 'TOOL_INVOKED':
        console.log(`🔧 ツール: ${data.tool}`);
        break;
      case 'TOOL_OBSERVATION':
        console.log(`✓ 結果: ${data.result}`);
        break;
      case 'WORKFLOW_COMPLETED':
        console.log(`✓ 完了: ${data.result}`);
        ws.close();
        break;
      default:
        console.log(`[${data.type}] ${data.message || ''}`);
    }
  });

  ws.on('error', (err) => console.error('❌ エラー:', err));
  ws.on('close', () => console.log('接続が閉じられました'));

  return ws;
}

const ws = connectWebSocket('task_abc123', 'sk_test_123456');

Go (SSE)

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
)

type Event struct {
    Type      string                 `json:"type"`
    WorkflowID string                `json:"workflow_id"`
    Message   string                 `json:"message,omitempty"`
    Data      map[string]interface{} `json:",inline"`
}

func streamEvents(taskID, apiKey string) error {
    url := fmt.Sprintf(
        "http://localhost:8080/api/v1/stream/sse?workflow_id=%s",
        taskID,
    )

    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("X-API-Key", apiKey)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    scanner := bufio.NewScanner(resp.Body)

    for scanner.Scan() {
        line := scanner.Text()

        if strings.HasPrefix(line, "data:") {
            data := strings.TrimPrefix(line, "data:")

            var event Event
            json.Unmarshal([]byte(data), &event)

            fmt.Printf("[%s] %s\n", event.Type, event.Message)

            if event.Type == "WORKFLOW_COMPLETED" {
                fmt.Println("✓ タスクが完了しました")
                break
            }
        }
    }

    return scanner.Err()
}

func main() {
    err := streamEvents("task_abc123", "sk_test_123456")
    if err != nil {
        fmt.Println("エラー:", err)
    }
}

Bash/curl (SSE)

#!/bin/bash

API_KEY="sk_test_123456"
TASK_ID="$1"

curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=$TASK_ID" \
  -H "X-API-Key: $API_KEY" \
  | while IFS= read -r line; do
    if [[ $line == data:* ]]; then
      # "data: {...}" から JSON を抽出
      JSON="${line#data:}"

      # 解析して表示
      TYPE=$(echo "$JSON" | jq -r '.type')
      MESSAGE=$(echo "$JSON" | jq -r '.message // ""')

      echo "[$(date +%T)] $TYPE: $MESSAGE"

      # 完了時に終了
      if [[ "$TYPE" == "WORKFLOW_COMPLETED" ]]; then
        echo ""
        echo "$JSON" | jq -r '.result'
        break
      fi
    fi
  done

使用例

1. リアルタイム進捗表示

def display_progress(task_id: str, api_key: str):
    """ユーザーにリアルタイムの進捗を表示します。"""
    print(f"タスク {task_id} を開始しています...")

    for event in stream_task_events(task_id, api_key):
        event_type = event.get('type')

        if event_type == 'AGENT_THINKING':
            print(f"💭 {event['message']}")
        elif event_type == 'TOOL_INVOKED':
            print(f"🔧 ツールを使用中: {event['tool']}")
        elif event_type == 'TOOL_OBSERVATION':
            print(f"✓ ツールが完了しました")
        elif event_type == 'LLM_PARTIAL':
            print(event['text'], end='', flush=True)
        elif event_type == 'WORKFLOW_COMPLETED':
            print(f"\n\n✓ 完了しました!")
            return event['result']

2. すべてのイベントをファイルにログ

import json
from datetime import datetime

def log_events_to_file(task_id: str, api_key: str, log_file: str):
    """すべてのイベントを JSON Lines ファイルにログします。"""
    with open(log_file, 'a') as f:
        for event in stream_task_events(task_id, api_key):
            # タイムスタンプを追加
            event['logged_at'] = datetime.now().isoformat()

            # JSON Lines として書き込み
            f.write(json.dumps(event) + '\n')
            f.flush()

            if event.get('type') == 'WORKFLOW_COMPLETED':
                break

log_events_to_file("task_abc123", "sk_test_123456", "task_events.jsonl")

3. ツール使用メトリクスの収集

def collect_tool_metrics(task_id: str, api_key: str):
    """ツール使用に関するメトリクスを収集します。"""
    metrics = {
        "tools_invoked": 0,
        "tools_succeeded": 0,
        "tools_failed": 0,
        "tool_list": []
    }

    for event in stream_task_events(task_id, api_key):
        if event.get('type') == 'TOOL_INVOKED':
            metrics['tools_invoked'] += 1
            metrics['tool_list'].append(event['tool'])
        elif event.get('type') == 'TOOL_OBSERVATION':
            metrics['tools_succeeded'] += 1
        elif event.get('type') == 'ERROR_OCCURRED':
            metrics['tools_failed'] += 1
        elif event.get('type') == 'WORKFLOW_COMPLETED':
            break

    return metrics

metrics = collect_tool_metrics("task_abc123", "sk_test_123456")
print(f"使用されたツール: {metrics['tool_list']}")
print(f"成功率: {metrics['tools_succeeded']}/{metrics['tools_invoked']}")

4. React UI 統合

import { useState, useEffect } from 'react';

function TaskMonitor({ taskId, apiKey }) {
  const [events, setEvents] = useState([]);
  const [status, setStatus] = useState('connecting');

  useEffect(() => {
    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`,
      { headers: { 'X-API-Key': apiKey } }
    );

    eventSource.onopen = () => setStatus('connected');

    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setEvents(prev => [...prev, data]);

      if (data.type === 'WORKFLOW_COMPLETED') {
        setStatus('completed');
        eventSource.close();
      }
    };

    eventSource.onerror = () => {
      setStatus('error');
      eventSource.close();
    };

    return () => eventSource.close();
  }, [taskId, apiKey]);

  return (
    <div>
      <h3>ステータス: {status}</h3>
      <ul>
        {events.map((event, i) => (
          <li key={i}>
            <strong>{event.type}</strong>: {event.message || event.tool}
          </li>
        ))}
      </ul>
    </div>
  );
}

ベストプラクティス

1. 接続エラーの処理

import time

def stream_with_retry(task_id: str, api_key: str, max_retries: int = 3):
    """接続失敗時に自動的に再試行するストリーム。"""
    for attempt in range(max_retries):
        try:
            for event in stream_task_events(task_id, api_key):
                yield event

                if event.get('type') == 'WORKFLOW_COMPLETED':
                    return
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"接続に失敗しました。{2 ** attempt}秒後に再試行します...")
                time.sleep(2 ** attempt)
            else:
                raise

2. タイムアウトの実装

import signal

def stream_with_timeout(task_id: str, api_key: str, timeout: int = 300):
    """タイムアウト付きのストリーム。"""
    def timeout_handler(signum, frame):
        raise TimeoutError("ストリームタイムアウト")

    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout)

    try:
        for event in stream_task_events(task_id, api_key):
            yield event
            if event.get('type') == 'WORKFLOW_COMPLETED':
                break
    finally:
        signal.alarm(0)  # アラームをキャンセル

3. クライアントサイドでのイベントフィルタリング

def stream_specific_events(task_id: str, api_key: str, event_types: set):
    """クライアントサイドでイベントをフィルタリングします。"""
    for event in stream_task_events(task_id, api_key):
        if event.get('type') in event_types:
            yield event

        if event.get('type') == 'WORKFLOW_COMPLETED':
            yield event
            break

# エージェントとツールのイベントのみを受信
for event in stream_specific_events(
    "task_abc123",
    "sk_test_123456",
    {"AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"}
):
    print(event)

4. 最後のイベントから再開

def stream_with_resume(task_id: str, api_key: str, last_event_id: str = None):
    """特定のイベントからストリーミングを再開します。"""
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"

    if last_event_id:
        url += f"&last_event_id={last_event_id}"

    # イベントをストリーミング...
注意: last_event_id は、Redis ストリーム ID(例: 1700000000000-0)または数値シーケンス(例: 42)のいずれかを受け入れます。数値の場合、リプレイには seq > last_event_id のイベントが含まれます。

比較: SSE vs WebSocket vs ポーリング

機能SSEWebSocketポーリング
方向サーバー → クライアント双方向クライアント → サーバー
プロトコルHTTPWebSocketHTTP
自動再接続はい(ブラウザ)いいえ(手動)該当なし
オーバーヘッド非常に低
シンプルさ
ユースケースリアルタイム更新インタラクティブアプリシンプルなステータス
Shannon サポート✅ 推奨✅ 利用可能⚠️ 理想的ではない

それぞれの使用時期

  • SSE: ほとんどのユースケース、リアルタイムモニタリング、進捗表示
  • WebSocket: インタラクティブアプリケーション、双方向通信が必要
  • ポーリング (GET /api/v1/tasks/): レガシーシステム、ストリーミングサポートなし

関連エンドポイント

ノート

イベント保持:
  • Redis: すべてのイベントは24時間保存(リアルタイムストリーミング)
  • PostgreSQL: 重要なイベントは90日間保存(履歴クエリ)
  • 接続が切れた場合は last_event_id を使用してストリーミングを再開
接続制限:
  • APIキーごとに最大100の同時ストリーミング接続
  • 5分間の非アクティブタイムアウト(自動接続切断)
  • 接続ごとのバッファサイズ制限は1MB
  • 単一のWebSocket上で複数のワークフローを多重化することを検討