Shannonは、Server-Sent Events (SSE)およびWebSocketプロトコルを通じてリアルタイムのイベントストリーミングを提供します。ストリーミングを使用して、タスクの実行を監視し、進捗を表示し、生成される結果を受け取ります。
認証: ストリーミングエンドポイントは、他のAPIと同じヘッダーを必要とします。
ブラウザはEventSourceでカスタムヘッダーを送信できません。
- 開発:
GATEWAY_SKIP_AUTH=1を設定します。
- 本番: バックエンドを介してSSEをプロキシし、
X-API-KeyまたはBearerヘッダーを注入します。
APIキーをURLクエリパラメータで渡さないでください。
ストリーミング制限:
- タイムアウト: 5分間の非アクティブ後にストリームは自動的に閉じます
- バッファサイズ: 接続ごとに最大1MBのバッファデータ
- 使用メタデータ: すべてのLLMプロバイダー(OpenAI、Anthropic、Google、Groq、xAI)に対してトークン数とコストが利用可能になりました
エンドポイント
| メソッド | エンドポイント | プロトコル | 説明 |
|---|
POST | /api/v1/tasks/stream | HTTP+SSE | タスクを送信し、ストリームURLを取得(推奨) |
GET | /api/v1/stream/sse | SSE | Server-Sent Eventsエンドポイント |
GET | /api/v1/stream/ws | WebSocket | WebSocketストリーミングエンドポイント |
GET | /api/v1/tasks/{id}/events | HTTP | 過去のイベントを取得(ページネーションあり) |
統一された送信 + ストリーム(推奨)
POST /api/v1/tasks/stream
タスクを送信し、そのイベントのストリーミングを即座に開始する最も簡単な方法です。このエンドポイントは、タスクの送信とストリーミングの設定を1回の呼び出しで組み合わせています。
フロントエンドアプリケーションに最適: このエンドポイントは、タスクを送信した直後に進捗を表示したいリアルタイムUIに最適です。
必要: はい
X-API-Key: sk_test_123456
または:
Authorization: Bearer YOUR_TOKEN
リクエストボディ
| パラメータ | タイプ | 必須 | 説明 |
|---|
query | string | はい | 自然言語のタスク説明 |
session_id | string | いいえ | マルチターン会話のためのセッション識別子 |
context | object | いいえ | キーと値のペアとしての追加コンテキストデータ |
model_tier | string | いいえ | 希望するティア: small, medium, または large |
model_override | string | いいえ | 特定のモデル名(標準; 例: gpt-5) |
provider_override | string | いいえ | プロバイダーを強制指定(例: openai, anthropic, google) |
レスポンス
ステータス: 201 Created
ボディ:
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"workflow_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"stream_url": "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}
レスポンスフィールド
| フィールド | タイプ | 説明 |
|---|
workflow_id | string | タスク/ワークフロー識別子 |
stream_url | string | SSEストリームエンドポイントへの相対URL |
例: JavaScript/TypeScript
async function submitAndStream(query, onEvent, onComplete, onError) {
try {
// 1. タスクを送信し、ストリームURLを取得
const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_TOKEN'
},
body: JSON.stringify({
query: query,
session_id: `session-${Date.now()}`
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const { task_id, workflow_id, stream_url } = await response.json();
console.log(`タスクが送信されました: ${workflow_id}`);
// 2. SSEストリームに接続
const eventSource = new EventSource(
`http://localhost:8080${stream_url}`,
{ withCredentials: false }
);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
onEvent(event);
// 完了を確認
if (event.type === 'WORKFLOW_COMPLETED') {
eventSource.close();
onComplete(event);
}
};
eventSource.onerror = (err) => {
console.error('SSEエラー:', err);
eventSource.close();
onError(err);
};
return { workflow_id, eventSource };
} catch (error) {
onError(error);
throw error;
}
}
// 使用例
submitAndStream(
"Q4の収益トレンドを分析する",
(event) => console.log(`[${event.type}]`, event.message),
(final) => console.log("完了:", final.result),
(error) => console.error("エラー:", error)
);
例: React Hook
import { useState, useCallback, useRef } from 'react';
function useTaskStream(apiUrl = 'http://localhost:8080') {
const [events, setEvents] = useState([]);
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState(null);
const [workflowId, setWorkflowId] = useState(null);
const eventSourceRef = useRef(null);
const submitTask = useCallback(async (query, sessionId = null) => {
setEvents([]);
setError(null);
setIsStreaming(true);
try {
// タスクを送信
const response = await fetch(`${apiUrl}/api/v1/tasks/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${localStorage.getItem('token')}`
},
body: JSON.stringify({
query,
session_id: sessionId || `session-${Date.now()}`
})
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const { task_id, workflow_id, stream_url } = await response.json();
setWorkflowId(workflow_id);
// ストリームに接続
const eventSource = new EventSource(`${apiUrl}${stream_url}`);
eventSourceRef.current = eventSource;
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
setEvents(prev => [...prev, event]);
if (event.type === 'WORKFLOW_COMPLETED' || event.type === 'ERROR_OCCURRED') {
eventSource.close();
setIsStreaming(false);
}
};
eventSource.onerror = (err) => {
setError(err);
setIsStreaming(false);
eventSource.close();
};
} catch (err) {
setError(err);
setIsStreaming(false);
}
}, [apiUrl]);
const stopStreaming = useCallback(() => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
setIsStreaming(false);
}
}, []);
return { submitTask, stopStreaming, events, isStreaming, error, workflowId };
}
// コンポーネントでの使用例
function TaskStreamDemo() {
const { submitTask, stopStreaming, events, isStreaming, error, workflowId } = useTaskStream();
return (
<div>
<button
onClick={() => submitTask("15 + 25 は何ですか?")}
disabled={isStreaming}
>
{isStreaming ? '処理中...' : 'タスクを送信'}
</button>
{workflowId && <p>ワークフローID: {workflowId}</p>}
{error && <p style={{color: 'red'}}>エラー: {error.message}</p>}
<div>
{events.map((event, idx) => (
<div key={idx}>
<strong>{event.type}</strong>: {event.message || event.agent_id}
</div>
))}
</div>
{isStreaming && <button onClick={stopStreaming}>停止</button>}
</div>
);
}
例: Vue 3 Composition API
<script setup>
import { ref } from 'vue';
const events = ref([]);
const isStreaming = ref(false);
const error = ref(null);
const workflowId = ref(null);
let eventSource = null;
async function submitTask(query) {
events.value = [];
error.value = null;
isStreaming.value = true;
try {
const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${localStorage.getItem('token')}`
},
body: JSON.stringify({
query,
session_id: `session-${Date.now()}`
})
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const { task_id, workflow_id, stream_url } = await response.json();
workflowId.value = workflow_id;
eventSource = new EventSource(`http://localhost:8080${stream_url}`);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
events.value.push(event);
if (event.type === 'WORKFLOW_COMPLETED') {
eventSource.close();
isStreaming.value = false;
}
};
eventSource.onerror = (err) => {
error.value = err;
eventSource.close();
isStreaming.value = false;
};
} catch (err) {
error.value = err;
isStreaming.value = false;
}
}
function stopStreaming() {
if (eventSource) {
eventSource.close();
isStreaming.value = false;
}
}
</script>
<template>
<div>
<button @click="submitTask('データを分析する')" :disabled="isStreaming">
{{ isStreaming ? '処理中...' : 'タスクを送信' }}
</button>
<div v-if="workflowId">ワークフロー: {{ workflowId }}</div>
<div v-if="error" style="color: red">エラー: {{ error.message }}</div>
<div v-for="(event, idx) in events" :key="idx">
<strong>{{ event.type }}</strong>: {{ event.message || event.agent_id }}
</div>
<button v-if="isStreaming" @click="stopStreaming">停止</button>
</div>
</template>
例: Python
import httpx
import json
def submit_and_stream(query: str, api_key: str):
"""タスクを送信し、イベントをストリーミングします。"""
# 1. タスクを送信
response = httpx.post(
"http://localhost:8080/api/v1/tasks/stream",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"query": query,
"session_id": f"session-{int(time.time())}"
}
)
data = response.json()
workflow_id = data["workflow_id"]
stream_url = data["stream_url"]
print(f"タスクが送信されました: {workflow_id}")
# 2. イベントをストリーミング
with httpx.stream(
"GET",
f"http://localhost:8080{stream_url}",
headers={"Authorization": f"Bearer {api_key}"},
timeout=None
) as stream_response:
for line in stream_response.iter_lines():
if line.startswith("data:"):
event = json.loads(line[5:])
print(f"[{event['type']}] {event.get('message', '')}")
if event['type'] in ['WORKFLOW_COMPLETED']:
return event
# 使用例
result = submit_and_stream("フランスの首都はどこですか?", "sk_test_123456")
print("最終結果:", result.get('result'))
このエンドポイントを使用する理由は? 統一されたエンドポイントにより、送信後すぐにストリーミングを開始でき、別々に送信して接続する際に発生する可能性のあるイベントの見逃しを防ぎます。
サーバー送信イベント (SSE)
GET /api/v1/stream/sse
サーバー送信イベントを使用したリアルタイムイベントストリーミング。
必要: はい
X-API-Key: sk_test_123456
クエリパラメータ
| パラメータ | 型 | 必須 | 説明 |
|---|
workflow_id | string | はい | タスク/ワークフロー識別子 |
types | string | いいえ | フィルタリングするイベントタイプのカンマ区切りリスト |
last_event_id | string | いいえ | 特定のイベントIDから再開します。RedisストリームID(例: 1700000000000-0)または数値シーケンス(例: 42)を受け入れます。数値の場合、再生には seq > last_event_id のイベントが含まれます。 |
イベントフォーマット
各イベントはSSE仕様に従います:
id: <event_id>
event: <event_type>
data: <json_payload>
リクエストの例
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \
-H "X-API-Key: sk_test_123456"
レスポンスの例
id: 1
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","timestamp":"2025-10-22T10:30:00Z","message":"ワークフローが開始されました"}
id: 2
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","agent_id":"agent_1","message":"クエリを分析中...","timestamp":"2025-10-22T10:30:01Z"}
id: 3
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","tool":"web_search","params":{"query":"Pythonプログラミング"},"timestamp":"2025-10-22T10:30:02Z"}
id: 4
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","tool":"web_search","result":"...","timestamp":"2025-10-22T10:30:05Z"}
id: 5
event: AGENT_COMPLETED
data: {"workflow_id":"task_abc123","agent_id":"agent_1","result":"Pythonは高水準プログラミング言語です...","timestamp":"2025-10-22T10:30:06Z"}
id: 6
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","status":"COMPLETED","result":"...","timestamp":"2025-10-22T10:30:07Z"}
WebSocket
GET /api/v1/stream/ws
WebSocketを介した双方向ストリーミング。
ゲートウェイは、ヘッダーのみ(X-API-KeyまたはAuthorization)を使用してWebSocket接続を認証します。ブラウザはWebSocketハンドシェイク中にカスタムヘッダーを設定できません。ブラウザでの使用方法:
GATEWAY_SKIP_AUTH=1でローカル実行するか、
- ヘッダーを挿入してからゲートウェイに転送するリバースプロキシを使用します。
サーバー環境向けのヘッダーを使用した例:
Node (ws):
import WebSocket from 'ws';
const ws = new WebSocket('ws://localhost:8080/api/v1/stream/ws', {
headers: { 'X-API-Key': 'sk_test_123456' },
});
ws.on('open', () => {
ws.send(JSON.stringify({ type: 'subscribe', workflow_id: 'task_abc123' }));
});
ws.on('message', (msg) => {
const data = JSON.parse(msg.toString());
console.log('Event:', data.type, data.message);
});
ws.on('error', (err) => console.error('WebSocket error:', err));
Python (websockets):
import asyncio, json, websockets
async def main():
async with websockets.connect(
'ws://localhost:8080/api/v1/stream/ws',
extra_headers={'X-API-Key': 'sk_test_123456'},
) as ws:
await ws.send(json.dumps({'type': 'subscribe', 'workflow_id': 'task_abc123'}))
async for message in ws:
data = json.loads(message)
print('Event:', data.get('type'), data.get('message'))
asyncio.run(main())
接続後にクエリ文字列または「auth」メッセージを介してAPIキーを渡すことは、ゲートウェイではサポートされていません。
メッセージタイプ
クライアント → サーバー:
// ワークフローにサブスクライブ
{
"type": "subscribe",
"workflow_id": "task_abc123",
"types": ["AGENT_THINKING", "TOOL_INVOKED"]
}
// サブスクリプション解除
{
"type": "unsubscribe",
"workflow_id": "task_abc123"
}
// Ping(キープアライブ)
{
"type": "ping"
}
サーバー → クライアント:
// イベント
{
"type": "AGENT_THINKING",
"workflow_id": "task_abc123",
"message": "クエリを分析中...",
"timestamp": "2025-10-22T10:30:00Z"
}
// Pong(キープアライブ応答)
{
"type": "pong"
}
OpenAI互換ストリーミングフォーマット
ShannonのSSEストリーミングは、OpenAI互換フォーマットを使用して使用メタデータとイベント構造を提供し、OpenAIベースのツールとのシームレスな統合を可能にしつつ、Shannonの包括的なイベント分類を維持します。
使用メタデータフォーマット
すべてのLLMプロバイダーは、OpenAIの標準構造で使用データを返します:
{
"type": "LLM_OUTPUT",
"data": {
"output": "応答テキスト...",
"model": "gpt-5-nano-2025-08-07",
"provider": "openai",
"usage": {
"total_tokens": 350,
"input_tokens": 200,
"output_tokens": 150
},
"cost_usd": 0.0105
}
}
互換性マトリックス:
| プロバイダー | 使用メタデータ | ストリームオプション |
|---|
| OpenAI | ✅ 完全サポート | stream_options: {"include_usage": true} |
| Anthropic | ✅ 完全サポート | 最終ストリーミングチャンク |
| Google | ✅ 完全サポート | 最終ストリーミングチャンク |
| Groq | ✅ 完全サポート | 最終ストリーミングチャンク |
| xAI | ⚠️ 限定的 | 単項モードでのみ利用可能 |
| OpenAI互換 | ⚠️ 優雅な劣化 | エンドポイントのサポートに依存 |
注意: Shannonは、stream_optionsパラメータをサポートしていないプロバイダーを優雅に処理します—ストリーミングは機能しますが、ストリーミング中に使用メタデータが利用できない場合があります。
イベントタイプ
コアイベント
| イベントタイプ | 説明 | 発生タイミング |
|---|
WORKFLOW_STARTED | ワークフローの実行が開始された | タスクの開始 |
WORKFLOW_COMPLETED | ワークフローが正常に終了した | タスクの終了(成功) |
エージェントイベント
| イベントタイプ | 説明 | ペイロードフィールド |
|---|
AGENT_THINKING | エージェントの推論/計画 | agent_id, message |
AGENT_COMPLETED | エージェントの実行が完了した | agent_id, result |
ツールイベント
| イベントタイプ | 説明 | ペイロードフィールド |
|---|
TOOL_INVOKED | ツールの実行が開始された | tool, params |
TOOL_OBSERVATION | エージェントがツールの結果を観察 | tool, result |
LLMイベント
| イベントタイプ | 説明 | ペイロードフィールド |
|---|
LLM_PROMPT | LLMに送信されたプロンプト | text |
LLM_PARTIAL | ストリーミングLLM出力 | text |
LLM_OUTPUT | 最終LLM出力 | text |
進捗およびシステムイベント
| イベントタイプ | 説明 | ペイロードフィールド |
|---|
PROGRESS | 進捗の更新 | progress, message |
DATA_PROCESSING | データ処理中 | message |
WAITING | リソースを待機中 | message |
ERROR_OCCURRED | エラーが発生した | error, severity |
ERROR_RECOVERY | エラー回復の試み | message |
WORKSPACE_UPDATED | メモリ/コンテキストが更新された | message |
BUDGET_THRESHOLD | 予算警告閾値に達した | usage_percent, threshold_percent, tokens_used, tokens_budget, level, budget_type |
ストリームライフサイクルイベント
| イベントタイプ | 説明 |
|---|
STREAM_END | ストリームの明示的な終了信号(このワークフローに対してはこれ以上のイベントは発生しません) |
SSE経由では、STREAM_ENDライフサイクルイベントはdoneという名前のSSEイベントとしてdata: [DONE](プレーンテキスト、JSONではない)で配信されます。WebSocket経由では、通常のJSONイベントとして"type": "STREAM_END"として現れます。
チームと承認
| イベントタイプ | 説明 |
|---|
TEAM_RECRUITED | 複数エージェントのチームが編成されました |
TEAM_RETIRED | チームが解散しました |
TEAM_STATUS | チームの調整更新 |
ROLE_ASSIGNED | エージェントの役割が割り当てられました |
DELEGATION | タスクが委任されました |
DEPENDENCY_SATISFIED | 依存関係が解決されました |
BUDGET_THRESHOLD | トークン予算の閾値に達しました |
APPROVAL_REQUESTED | 人間の承認が必要です |
APPROVAL_DECISION | 承認の決定が記録されました |
コード例
Python with httpx (SSE)
import httpx
import json
def stream_task_events(task_id: str, api_key: str):
"""SSEを使用してタスクイベントをストリームします。"""
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"
with httpx.stream(
"GET",
url,
headers={"X-API-Key": api_key},
timeout=None # ストリーミングのためタイムアウトなし
) as response:
for line in response.iter_lines():
if line.startswith("data:"):
data = json.loads(line[5:]) # "data:"プレフィックスを削除
yield data
# 使用例
for event in stream_task_events("task_abc123", "sk_test_123456"):
print(f"[{event.get('type')}] {event.get('message', '')}")
if event.get('type') == 'WORKFLOW_COMPLETED':
print("最終結果:", event.get('result'))
break
Python - イベントフィルタリング付きストリーム
def stream_filtered_events(task_id: str, api_key: str, event_types: list):
"""特定のイベントタイプのみをストリームします。"""
types_param = ",".join(event_types)
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}&types={types_param}"
with httpx.stream("GET", url, headers={"X-API-Key": api_key}) as response:
for line in response.iter_lines():
if line.startswith("data:"):
yield json.loads(line[5:])
# エージェントの思考とツールイベントのみを受信
for event in stream_filtered_events(
"task_abc123",
"sk_test_123456",
["AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"]
):
print(f"{event['type']}: {event.get('message', event.get('tool'))}")
JavaScript/Node.js (SSE)
const EventSource = require('eventsource');
function streamTaskEvents(taskId, apiKey) {
const url = `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`;
const eventSource = new EventSource(url, {
headers: {
'X-API-Key': apiKey
}
});
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`[${data.type}] ${data.message || ''}`);
if (data.type === 'WORKFLOW_COMPLETED') {
console.log('最終結果:', data.result);
eventSource.close();
}
};
eventSource.onerror = (error) => {
console.error('SSEエラー:', error);
eventSource.close();
};
return eventSource;
}
// 使用例
const stream = streamTaskEvents('task_abc123', 'sk_test_123456');
// 手動で閉じる
setTimeout(() => stream.close(), 60000);
JavaScript/Node.js - WebSocket (ws)
import WebSocket from 'ws';
function connectWebSocket(taskId, apiKey) {
const ws = new WebSocket(
`ws://localhost:8080/api/v1/stream/ws?workflow_id=${taskId}`,
{ headers: { 'X-API-Key': apiKey } }
);
ws.on('open', () => console.log('✓ 接続されました'));
ws.on('message', (msg) => {
const data = JSON.parse(msg.toString());
switch (data.type) {
case 'AGENT_THINKING':
console.log(`💭 ${data.message}`);
break;
case 'TOOL_INVOKED':
console.log(`🔧 ツール: ${data.tool}`);
break;
case 'TOOL_OBSERVATION':
console.log(`✓ 結果: ${data.result}`);
break;
case 'WORKFLOW_COMPLETED':
console.log(`✓ 完了: ${data.result}`);
ws.close();
break;
default:
console.log(`[${data.type}] ${data.message || ''}`);
}
});
ws.on('error', (err) => console.error('❌ エラー:', err));
ws.on('close', () => console.log('接続が閉じられました'));
return ws;
}
const ws = connectWebSocket('task_abc123', 'sk_test_123456');
Go (SSE)
package main
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"strings"
)
type Event struct {
Type string `json:"type"`
WorkflowID string `json:"workflow_id"`
Message string `json:"message,omitempty"`
Data map[string]interface{} `json:",inline"`
}
func streamEvents(taskID, apiKey string) error {
url := fmt.Sprintf(
"http://localhost:8080/api/v1/stream/sse?workflow_id=%s",
taskID,
)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("X-API-Key", apiKey)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data:") {
data := strings.TrimPrefix(line, "data:")
var event Event
json.Unmarshal([]byte(data), &event)
fmt.Printf("[%s] %s\n", event.Type, event.Message)
if event.Type == "WORKFLOW_COMPLETED" {
fmt.Println("✓ タスクが完了しました")
break
}
}
}
return scanner.Err()
}
func main() {
err := streamEvents("task_abc123", "sk_test_123456")
if err != nil {
fmt.Println("エラー:", err)
}
}
Bash/curl (SSE)
#!/bin/bash
API_KEY="sk_test_123456"
TASK_ID="$1"
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=$TASK_ID" \
-H "X-API-Key: $API_KEY" \
| while IFS= read -r line; do
if [[ $line == data:* ]]; then
# "data: {...}" から JSON を抽出
JSON="${line#data:}"
# 解析して表示
TYPE=$(echo "$JSON" | jq -r '.type')
MESSAGE=$(echo "$JSON" | jq -r '.message // ""')
echo "[$(date +%T)] $TYPE: $MESSAGE"
# 完了時に終了
if [[ "$TYPE" == "WORKFLOW_COMPLETED" ]]; then
echo ""
echo "$JSON" | jq -r '.result'
break
fi
fi
done
使用例
1. リアルタイム進捗表示
def display_progress(task_id: str, api_key: str):
"""ユーザーにリアルタイムの進捗を表示します。"""
print(f"タスク {task_id} を開始しています...")
for event in stream_task_events(task_id, api_key):
event_type = event.get('type')
if event_type == 'AGENT_THINKING':
print(f"💭 {event['message']}")
elif event_type == 'TOOL_INVOKED':
print(f"🔧 ツールを使用中: {event['tool']}")
elif event_type == 'TOOL_OBSERVATION':
print(f"✓ ツールが完了しました")
elif event_type == 'LLM_PARTIAL':
print(event['text'], end='', flush=True)
elif event_type == 'WORKFLOW_COMPLETED':
print(f"\n\n✓ 完了しました!")
return event['result']
2. すべてのイベントをファイルにログ
import json
from datetime import datetime
def log_events_to_file(task_id: str, api_key: str, log_file: str):
"""すべてのイベントを JSON Lines ファイルにログします。"""
with open(log_file, 'a') as f:
for event in stream_task_events(task_id, api_key):
# タイムスタンプを追加
event['logged_at'] = datetime.now().isoformat()
# JSON Lines として書き込み
f.write(json.dumps(event) + '\n')
f.flush()
if event.get('type') == 'WORKFLOW_COMPLETED':
break
log_events_to_file("task_abc123", "sk_test_123456", "task_events.jsonl")
3. ツール使用メトリクスの収集
def collect_tool_metrics(task_id: str, api_key: str):
"""ツール使用に関するメトリクスを収集します。"""
metrics = {
"tools_invoked": 0,
"tools_succeeded": 0,
"tools_failed": 0,
"tool_list": []
}
for event in stream_task_events(task_id, api_key):
if event.get('type') == 'TOOL_INVOKED':
metrics['tools_invoked'] += 1
metrics['tool_list'].append(event['tool'])
elif event.get('type') == 'TOOL_OBSERVATION':
metrics['tools_succeeded'] += 1
elif event.get('type') == 'ERROR_OCCURRED':
metrics['tools_failed'] += 1
elif event.get('type') == 'WORKFLOW_COMPLETED':
break
return metrics
metrics = collect_tool_metrics("task_abc123", "sk_test_123456")
print(f"使用されたツール: {metrics['tool_list']}")
print(f"成功率: {metrics['tools_succeeded']}/{metrics['tools_invoked']}")
4. React UI 統合
import { useState, useEffect } from 'react';
function TaskMonitor({ taskId, apiKey }) {
const [events, setEvents] = useState([]);
const [status, setStatus] = useState('connecting');
useEffect(() => {
const eventSource = new EventSource(
`http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`,
{ headers: { 'X-API-Key': apiKey } }
);
eventSource.onopen = () => setStatus('connected');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setEvents(prev => [...prev, data]);
if (data.type === 'WORKFLOW_COMPLETED') {
setStatus('completed');
eventSource.close();
}
};
eventSource.onerror = () => {
setStatus('error');
eventSource.close();
};
return () => eventSource.close();
}, [taskId, apiKey]);
return (
<div>
<h3>ステータス: {status}</h3>
<ul>
{events.map((event, i) => (
<li key={i}>
<strong>{event.type}</strong>: {event.message || event.tool}
</li>
))}
</ul>
</div>
);
}
ベストプラクティス
1. 接続エラーの処理
import time
def stream_with_retry(task_id: str, api_key: str, max_retries: int = 3):
"""接続失敗時に自動的に再試行するストリーム。"""
for attempt in range(max_retries):
try:
for event in stream_task_events(task_id, api_key):
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
return
except Exception as e:
if attempt < max_retries - 1:
print(f"接続に失敗しました。{2 ** attempt}秒後に再試行します...")
time.sleep(2 ** attempt)
else:
raise
2. タイムアウトの実装
import signal
def stream_with_timeout(task_id: str, api_key: str, timeout: int = 300):
"""タイムアウト付きのストリーム。"""
def timeout_handler(signum, frame):
raise TimeoutError("ストリームタイムアウト")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
for event in stream_task_events(task_id, api_key):
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
break
finally:
signal.alarm(0) # アラームをキャンセル
3. クライアントサイドでのイベントフィルタリング
def stream_specific_events(task_id: str, api_key: str, event_types: set):
"""クライアントサイドでイベントをフィルタリングします。"""
for event in stream_task_events(task_id, api_key):
if event.get('type') in event_types:
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
yield event
break
# エージェントとツールのイベントのみを受信
for event in stream_specific_events(
"task_abc123",
"sk_test_123456",
{"AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"}
):
print(event)
4. 最後のイベントから再開
def stream_with_resume(task_id: str, api_key: str, last_event_id: str = None):
"""特定のイベントからストリーミングを再開します。"""
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"
if last_event_id:
url += f"&last_event_id={last_event_id}"
# イベントをストリーミング...
注意: last_event_id は、Redis ストリーム ID(例: 1700000000000-0)または数値シーケンス(例: 42)のいずれかを受け入れます。数値の場合、リプレイには seq > last_event_id のイベントが含まれます。
比較: SSE vs WebSocket vs ポーリング
| 機能 | SSE | WebSocket | ポーリング |
|---|
| 方向 | サーバー → クライアント | 双方向 | クライアント → サーバー |
| プロトコル | HTTP | WebSocket | HTTP |
| 自動再接続 | はい(ブラウザ) | いいえ(手動) | 該当なし |
| オーバーヘッド | 低 | 非常に低 | 高 |
| シンプルさ | 高 | 中 | 高 |
| ユースケース | リアルタイム更新 | インタラクティブアプリ | シンプルなステータス |
| Shannon サポート | ✅ 推奨 | ✅ 利用可能 | ⚠️ 理想的ではない |
それぞれの使用時期
- SSE: ほとんどのユースケース、リアルタイムモニタリング、進捗表示
- WebSocket: インタラクティブアプリケーション、双方向通信が必要
- ポーリング (GET /api/v1/tasks/): レガシーシステム、ストリーミングサポートなし
関連エンドポイント
ノート
イベント保持:
- Redis: すべてのイベントは24時間保存(リアルタイムストリーミング)
- PostgreSQL: 重要なイベントは90日間保存(履歴クエリ)
- 接続が切れた場合は
last_event_id を使用してストリーミングを再開
接続制限:
- APIキーごとに最大100の同時ストリーミング接続
- 5分間の非アクティブタイムアウト(自動接続切断)
- 接続ごとのバッファサイズ制限は1MB
- 単一のWebSocket上で複数のワークフローを多重化することを検討