Shannonは、Server-Sent Events (SSE)およびWebSocketプロトコルを通じてリアルタイムのイベントストリーミングを提供します。ストリーミングを使用して、タスクの実行を監視し、進捗を表示し、生成される結果を受け取ります。
認証 : ストリーミングエンドポイントは、他のAPIと同じヘッダーを必要とします。
ブラウザはEventSourceでカスタムヘッダーを送信できません。
開発: GATEWAY_SKIP_AUTH=1を設定します。
本番: バックエンドを介してSSEをプロキシし、X-API-KeyまたはBearerヘッダーを注入します。
SSEエンドポイントでは、フォールバックとして api_key クエリパラメータがサポートされています(例: ?api_key=sk_...)。その他のエンドポイントではヘッダーを使用してください。
ストリーミング制限 :
タイムアウト : 5分間の非アクティブ後にストリームは自動的に閉じます
バッファサイズ : 接続ごとに最大1MBのバッファデータ
使用メタデータ : すべてのLLMプロバイダー(OpenAI、Anthropic、Google、Groq、xAI)に対してトークン数とコストが利用可能になりました
エンドポイント
メソッド エンドポイント プロトコル 説明 POST/api/v1/tasks/streamHTTP+SSE タスクを送信し、ストリームURLを取得(推奨) GET/api/v1/stream/sseSSE Server-Sent Eventsエンドポイント GET/api/v1/stream/wsWebSocket WebSocketストリーミングエンドポイント GET/api/v1/tasks/{id}/eventsHTTP 過去のイベントを取得(ページネーションあり)
統一された送信 + ストリーム(推奨)
POST /api/v1/tasks/stream
タスクを送信し、そのイベントのストリーミングを即座に開始する最も簡単な方法です。このエンドポイントは、タスクの送信とストリーミングの設定を1回の呼び出しで組み合わせています。
フロントエンドアプリケーションに最適 : このエンドポイントは、タスクを送信した直後に進捗を表示したいリアルタイムUIに最適です。
必要 : はい
X-API-Key: sk_test_123456
または:
Authorization: Bearer YOUR_TOKEN
リクエストボディ
パラメータ タイプ 必須 説明 querystring はい 自然言語のタスク説明 session_idstring いいえ マルチターン会話のためのセッション識別子 contextobject いいえ キーと値のペアとしての追加コンテキストデータ model_tierstring いいえ 希望するティア: small, medium, または large model_overridestring いいえ 特定のモデル名(標準; 例: gpt-5) provider_overridestring いいえ プロバイダーを強制指定(例: openai, anthropic, google)
レスポンス
ステータス : 201 Created
ボディ :
{
"task_id" : "task_01HQZX3Y9K8M2P4N5S7T9W2V" ,
"workflow_id" : "task_01HQZX3Y9K8M2P4N5S7T9W2V" ,
"stream_url" : "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}
レスポンスフィールド
フィールド タイプ 説明 task_idstring 一意のタスク識別子 workflow_idstring タスク/ワークフロー識別子 stream_urlstring SSEストリームエンドポイントの相対URL
例: JavaScript/TypeScript
async function submitAndStream ( query , onEvent , onComplete , onError ) {
try {
// 1. タスクを送信し、ストリームURLを取得
const response = await fetch ( 'http://localhost:8080/api/v1/tasks/stream' , {
method: 'POST' ,
headers: {
'Content-Type' : 'application/json' ,
'Authorization' : 'Bearer YOUR_TOKEN'
},
body: JSON . stringify ({
query: query ,
session_id: `session- ${ Date . now () } `
})
});
if (! response . ok ) {
throw new Error ( `HTTP ${ response . status } : ${ response . statusText } ` );
}
const { task_id , workflow_id , stream_url } = await response . json ();
console . log ( `タスクが送信されました: ${ workflow_id } ` );
// 2. SSEストリームに接続
// 注意: ブラウザのEventSourceはカスタムヘッダーをサポートしていません。
// api_keyクエリパラメータを使用するか、バックエンドでSSEをプロキシしてください。
const eventSource = new EventSource (
`http://localhost:8080 ${ stream_url } &api_key= ${ localStorage . getItem ( 'token' ) } `
);
eventSource . onmessage = ( e ) => {
const event = JSON . parse ( e . data );
onEvent ( event );
// 完了を確認
if ( event . type === 'WORKFLOW_COMPLETED' ) {
eventSource . close ();
onComplete ( event );
}
};
eventSource . onerror = ( err ) => {
console . error ( 'SSEエラー:' , err );
eventSource . close ();
onError ( err );
};
return { workflow_id , eventSource };
} catch ( error ) {
onError ( error );
throw error ;
}
}
// 使用例
submitAndStream (
"Q4の収益トレンドを分析する" ,
( event ) => console . log ( `[ ${ event . type } ]` , event . message ),
( final ) => console . log ( "完了:" , final . result ),
( error ) => console . error ( "エラー:" , error )
);
例: React Hook
import { useState , useCallback , useRef } from 'react' ;
function useTaskStream ( apiUrl = 'http://localhost:8080' ) {
const [ events , setEvents ] = useState ([]);
const [ isStreaming , setIsStreaming ] = useState ( false );
const [ error , setError ] = useState ( null );
const [ workflowId , setWorkflowId ] = useState ( null );
const eventSourceRef = useRef ( null );
const submitTask = useCallback ( async ( query , sessionId = null ) => {
setEvents ([]);
setError ( null );
setIsStreaming ( true );
try {
// タスクを送信
const response = await fetch ( ` ${ apiUrl } /api/v1/tasks/stream` , {
method: 'POST' ,
headers: {
'Content-Type' : 'application/json' ,
'Authorization' : `Bearer ${ localStorage . getItem ( 'token' ) } `
},
body: JSON . stringify ({
query ,
session_id: sessionId || `session- ${ Date . now () } `
})
});
if (! response . ok ) throw new Error ( `HTTP ${ response . status } ` );
const { task_id , workflow_id , stream_url } = await response . json ();
setWorkflowId ( workflow_id );
// ストリームに接続
// 注意: ブラウザのEventSourceはカスタムヘッダーをサポートしていません。
// api_keyクエリパラメータを使用するか、バックエンドでSSEをプロキシしてください。
const token = localStorage . getItem ( 'token' );
const sseUrl = token
? ` ${ apiUrl }${ stream_url } &api_key= ${ token } `
: ` ${ apiUrl }${ stream_url } ` ;
const eventSource = new EventSource ( sseUrl );
eventSourceRef . current = eventSource ;
eventSource . onmessage = ( e ) => {
const event = JSON . parse ( e . data );
setEvents ( prev => [... prev , event ]);
if ( event . type === 'WORKFLOW_COMPLETED' || event . type === 'ERROR_OCCURRED' ) {
eventSource . close ();
setIsStreaming ( false );
}
};
eventSource . onerror = ( err ) => {
setError ( err );
setIsStreaming ( false );
eventSource . close ();
};
} catch ( err ) {
setError ( err );
setIsStreaming ( false );
}
}, [ apiUrl ]);
const stopStreaming = useCallback (() => {
if ( eventSourceRef . current ) {
eventSourceRef . current . close ();
setIsStreaming ( false );
}
}, []);
return { submitTask , stopStreaming , events , isStreaming , error , workflowId };
}
// コンポーネントでの使用例
function TaskStreamDemo () {
const { submitTask , stopStreaming , events , isStreaming , error , workflowId } = useTaskStream ();
return (
< div >
< button
onClick = { () => submitTask ( "15 + 25 は何ですか?" ) }
disabled = { isStreaming }
>
{ isStreaming ? '処理中...' : 'タスクを送信' }
</ button >
{ workflowId && < p > ワークフローID: { workflowId } </ p > }
{ error && < p style = { { color: 'red' } } > エラー: { error . message } </ p > }
< div >
{ events . map (( event , idx ) => (
< div key = { idx } >
< strong > { event . type } </ strong > : { event . message || event . agent_id }
</ div >
)) }
</ div >
{ isStreaming && < button onClick = { stopStreaming } > 停止 </ button > }
</ div >
);
}
例: Vue 3 Composition API
< script setup >
import { ref } from 'vue' ;
const events = ref ([]);
const isStreaming = ref ( false );
const error = ref ( null );
const workflowId = ref ( null );
let eventSource = null ;
async function submitTask ( query ) {
events . value = [];
error . value = null ;
isStreaming . value = true ;
try {
const response = await fetch ( 'http://localhost:8080/api/v1/tasks/stream' , {
method: 'POST' ,
headers: {
'Content-Type' : 'application/json' ,
'Authorization' : `Bearer ${ localStorage . getItem ( 'token' ) } `
},
body: JSON . stringify ({
query ,
session_id: `session- ${ Date . now () } `
})
});
if (! response . ok ) throw new Error ( `HTTP ${ response . status } ` );
const { task_id , workflow_id , stream_url } = await response . json ();
workflowId . value = workflow_id ;
// Browser EventSource does NOT support custom headers — use api_key query param
eventSource = new EventSource ( `http://localhost:8080 ${ stream_url } &api_key= ${ localStorage . getItem ( 'token' ) } ` );
eventSource . onmessage = ( e ) => {
const event = JSON . parse ( e . data );
events . value . push ( event );
if ( event . type === 'WORKFLOW_COMPLETED' ) {
eventSource . close ();
isStreaming . value = false ;
}
};
eventSource . onerror = ( err ) => {
error . value = err ;
eventSource . close ();
isStreaming . value = false ;
};
} catch ( err ) {
error . value = err ;
isStreaming . value = false ;
}
}
function stopStreaming () {
if ( eventSource ) {
eventSource . close ();
isStreaming . value = false ;
}
}
</ script >
< template >
< div >
< button @ click = " submitTask ( 'データを分析する' ) " : disabled = " isStreaming " >
{{ isStreaming ? '処理中...' : 'タスクを送信' }}
</ button >
< div v-if = " workflowId " > ワークフロー: {{ workflowId }} </ div >
< div v-if = " error " style = " color : red " > エラー: {{ error . message }} </ div >
< div v-for = " ( event , idx ) in events " : key = " idx " >
< strong > {{ event . type }} </ strong > : {{ event . message || event . agent_id }}
</ div >
< button v-if = " isStreaming " @ click = " stopStreaming " > 停止 </ button >
</ div >
</ template >
例: Python
import httpx
import json
def submit_and_stream ( query : str , api_key : str ):
"""タスクを送信し、イベントをストリーミングします。"""
# 1. タスクを送信
response = httpx.post(
"http://localhost:8080/api/v1/tasks/stream" ,
headers ={
"Authorization" : f "Bearer { api_key } " ,
"Content-Type" : "application/json"
},
json ={
"query" : query,
"session_id" : f "session- { int (time.time()) } "
}
)
data = response.json()
workflow_id = data[ "workflow_id" ]
stream_url = data[ "stream_url" ]
print ( f "タスクが送信されました: { workflow_id } " )
# 2. イベントをストリーミング
with httpx.stream(
"GET" ,
f "http://localhost:8080 { stream_url } " ,
headers ={ "Authorization" : f "Bearer { api_key } " },
timeout = None
) as stream_response:
for line in stream_response.iter_lines():
if line.startswith( "data:" ):
event = json.loads(line[ 5 :])
print ( f "[ { event[ 'type' ] } ] { event.get( 'message' , '' ) } " )
if event[ 'type' ] in [ 'WORKFLOW_COMPLETED' ]:
return event
# 使用例
result = submit_and_stream( "フランスの首都はどこですか?" , "sk_test_123456" )
print ( "最終結果:" , result.get( 'result' ))
このエンドポイントを使用する理由は? 統一されたエンドポイントにより、送信後すぐにストリーミングを開始でき、別々に送信して接続する際に発生する可能性のあるイベントの見逃しを防ぎます。
サーバー送信イベント (SSE)
GET /api/v1/stream/sse
サーバー送信イベントを使用したリアルタイムイベントストリーミング。
必要 : はい
X-API-Key: sk_test_123456
クエリパラメータ
パラメータ 型 必須 説明 workflow_idstring はい タスク/ワークフロー識別子 typesstring いいえ フィルタリングするイベントタイプのカンマ区切りリスト last_event_idstring いいえ 特定のイベントIDから再開します。RedisストリームID(例: 1700000000000-0)または数値シーケンス(例: 42)を受け入れます。数値の場合、再生には seq > last_event_id のイベントが含まれます。
イベントフォーマット
各イベントはSSE仕様に従います:
id: <event_id>
event: <event_type>
data: <json_payload>
リクエストの例
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \
-H "X-API-Key: sk_test_123456"
レスポンスの例
id: 1719000000000-0
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","type":"WORKFLOW_STARTED","agent_id":"orchestrator","message":"Starting up","timestamp":"2025-10-22T10:30:00Z","seq":1,"stream_id":"1719000000000-0"}
id: 1719000001000-0
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","type":"AGENT_THINKING","agent_id":"Ryogoku","message":"Analyzing query...","timestamp":"2025-10-22T10:30:01Z","seq":2,"stream_id":"1719000001000-0"}
id: 1719000002000-0
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","type":"TOOL_INVOKED","agent_id":"Ryogoku","message":"Calling web_search","payload":{"tool":"web_search","params":{"query":"Python programming"}},"timestamp":"2025-10-22T10:30:02Z","seq":3,"stream_id":"1719000002000-0"}
id: 1719000005000-0
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","type":"TOOL_OBSERVATION","agent_id":"Ryogoku","message":"web_search completed","payload":{"success":true,"duration_ms":2800},"timestamp":"2025-10-22T10:30:05Z","seq":4,"stream_id":"1719000005000-0"}
id: 1719000006000-0
event: thread.message.completed
data: {"workflow_id":"task_abc123","agent_id":"simple-agent","response":"Python is a high-level programming language...","metadata":{"model_used":"claude-haiku-4-5-20251001","tokens_used":850,"cost_usd":0.003},"seq":5,"stream_id":"1719000006000-0"}
id: 1719000007000-0
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","type":"WORKFLOW_COMPLETED","message":"Workflow completed","timestamp":"2025-10-22T10:30:07Z","seq":6,"stream_id":"1719000007000-0"}
イベントIDの形式 : idフィールドはRedisストリームID(例: 1719000000000-0)を使用しており、単純な整数ではありません 。last_event_idで再接続する際は、これらの正確なIDを使用する必要があります。詳しくは下記の再接続 を参照してください。
WebSocket
GET /api/v1/stream/ws
WebSocketを介した双方向ストリーミング。
ゲートウェイは、ヘッダーのみ(X-API-KeyまたはAuthorization)を使用してWebSocket接続を認証します。ブラウザはWebSocketハンドシェイク中にカスタムヘッダーを設定できません。ブラウザでの使用方法:
GATEWAY_SKIP_AUTH=1でローカル実行するか、
ヘッダーを挿入してからゲートウェイに転送するリバースプロキシを使用します。
サーバー環境向けのヘッダーを使用した例:
Node (ws):
import WebSocket from 'ws' ;
const ws = new WebSocket ( 'ws://localhost:8080/api/v1/stream/ws' , {
headers: { 'X-API-Key' : 'sk_test_123456' },
});
ws . on ( 'open' , () => {
ws . send ( JSON . stringify ({ type: 'subscribe' , workflow_id: 'task_abc123' }));
});
ws . on ( 'message' , ( msg ) => {
const data = JSON . parse ( msg . toString ());
console . log ( 'Event:' , data . type , data . message );
});
ws . on ( 'error' , ( err ) => console . error ( 'WebSocket error:' , err ));
Python (websockets):
import asyncio, json, websockets
async def main ():
async with websockets.connect(
'ws://localhost:8080/api/v1/stream/ws' ,
extra_headers ={ 'X-API-Key' : 'sk_test_123456' },
) as ws:
await ws.send(json.dumps({ 'type' : 'subscribe' , 'workflow_id' : 'task_abc123' }))
async for message in ws:
data = json.loads(message)
print ( 'Event:' , data.get( 'type' ), data.get( 'message' ))
asyncio.run(main())
接続後にクエリ文字列または「auth」メッセージを介してAPIキーを渡すことは、ゲートウェイではサポートされていません。
メッセージタイプ
クライアント → サーバー :
// ワークフローにサブスクライブ
{
"type" : "subscribe" ,
"workflow_id" : "task_abc123" ,
"types" : [ "AGENT_THINKING" , "TOOL_INVOKED" ]
}
// サブスクリプション解除
{
"type" : "unsubscribe" ,
"workflow_id" : "task_abc123"
}
// Ping(キープアライブ)
{
"type" : "ping"
}
サーバー → クライアント :
// イベント
{
"type" : "AGENT_THINKING" ,
"workflow_id" : "task_abc123" ,
"message" : "クエリを分析中..." ,
"timestamp" : "2025-10-22T10:30:00Z"
}
// Pong(キープアライブ応答)
{
"type" : "pong"
}
OpenAI互換ストリーミング
Shannonは/v1/chat/completionsでOpenAI互換のストリーミングエンドポイントも提供しており、ShannonイベントをOpenAI標準のchat.completion.chunk形式に変換します。これにより、OpenAI SDKを直接Shannonで使用できます。
OpenAI互換APIの完全なドキュメント(リクエスト/レスポンススキーマ、利用可能なモデル、Shannon固有の拡張(shannon_events)、SDK使用例を含む)については、OpenAI互換APIリファレンス を参照してください。
イベントタイプ
コアイベント
イベントタイプ 説明 発生タイミング WORKFLOW_STARTEDワークフローの実行が開始された タスクの開始 WORKFLOW_COMPLETEDワークフローが正常に終了した タスクの終了(成功)
エージェントイベント
イベントタイプ 説明 ペイロードフィールド AGENT_THINKINGエージェントの推論/計画 agent_id, messageAGENT_COMPLETEDエージェントの実行が完了した agent_id, result
ツールイベント
イベントタイプ 説明 ペイロードフィールド TOOL_INVOKEDツールの実行が開始された tool, paramsTOOL_OBSERVATIONエージェントがツールの結果を観察 tool, result
LLMイベント
イベントタイプ 説明 ペイロードフィールド LLM_PROMPTLLMに送信されたプロンプト message, payload.model, payload.providerthread.message.deltaストリーミングテキストチャンク delta, agent_idthread.message.completed使用メタデータ付きの最終レスポンス response, metadata.model_used, metadata.tokens_used, metadata.cost_usdLLM_PARTIAL生のLLMストリーミングトークン textLLM_OUTPUT生のLLM最終出力 text
ほとんどの統合では 、LLM_PARTIAL/LLM_OUTPUTではなく、thread.message.delta(ストリーミングテキスト)とthread.message.completed(使用メタデータ付きの最終結果)をリッスンしてください。
進捗およびシステムイベント
イベントタイプ 説明 ペイロードフィールド PROGRESS進捗の更新 progress, messageDATA_PROCESSINGデータ処理中 messageWAITINGリソースを待機中 messageERROR_OCCURREDエラーが発生した error, severityERROR_RECOVERYエラー回復の試み messageWORKSPACE_UPDATEDメモリ/コンテキストが更新された messageBUDGET_THRESHOLD予算警告閾値に達した usage_percent, threshold_percent, tokens_used, tokens_budget, level, budget_type
ストリームライフサイクルイベント
イベントタイプ 説明 STREAM_ENDストリームの明示的な終了信号(このワークフローに対してはこれ以上のイベントは発生しません)
SSE経由では、STREAM_ENDライフサイクルイベントはdoneという名前のSSEイベントとしてdata: [DONE](プレーンテキスト、JSONではない)で配信されます。WebSocket経由では、通常のJSONイベントとして"type": "STREAM_END"として現れます。
チームと承認
イベントタイプ 説明 TEAM_RECRUITED複数エージェントのチームが編成されました TEAM_RETIREDチームが解散しました TEAM_STATUSチームの調整更新 ROLE_ASSIGNEDエージェントの役割が割り当てられました DELEGATIONタスクが委任されました DEPENDENCY_SATISFIED依存関係が解決されました APPROVAL_REQUESTED人間の承認が必要です APPROVAL_DECISION承認の決定が記録されました
コード例
Python with httpx (SSE)
import httpx
import json
def stream_task_events ( task_id : str , api_key : str ):
"""SSEを使用してタスクイベントをストリームします。"""
url = f "http://localhost:8080/api/v1/stream/sse?workflow_id= { task_id } "
with httpx.stream(
"GET" ,
url,
headers ={ "X-API-Key" : api_key},
timeout = None # ストリーミングのためタイムアウトなし
) as response:
for line in response.iter_lines():
if line.startswith( "data:" ):
data = json.loads(line[ 5 :]) # "data:"プレフィックスを削除
yield data
# 使用例
for event in stream_task_events( "task_abc123" , "sk_test_123456" ):
print ( f "[ { event.get( 'type' ) } ] { event.get( 'message' , '' ) } " )
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
print ( "最終結果:" , event.get( 'result' ))
break
Python - イベントフィルタリング付きストリーム
def stream_filtered_events ( task_id : str , api_key : str , event_types : list ):
"""特定のイベントタイプのみをストリームします。"""
types_param = "," .join(event_types)
url = f "http://localhost:8080/api/v1/stream/sse?workflow_id= { task_id } &types= { types_param } "
with httpx.stream( "GET" , url, headers ={ "X-API-Key" : api_key}) as response:
for line in response.iter_lines():
if line.startswith( "data:" ):
yield json.loads(line[ 5 :])
# エージェントの思考とツールイベントのみを受信
for event in stream_filtered_events(
"task_abc123" ,
"sk_test_123456" ,
[ "AGENT_THINKING" , "TOOL_INVOKED" , "TOOL_OBSERVATION" ]
):
print ( f " { event[ 'type' ] } : { event.get( 'message' , event.get( 'tool' )) } " )
JavaScript/Node.js (SSE)
const EventSource = require ( 'eventsource' );
function streamTaskEvents ( taskId , apiKey ) {
const url = `http://localhost:8080/api/v1/stream/sse?workflow_id= ${ taskId } ` ;
const eventSource = new EventSource ( url , {
headers: {
'X-API-Key' : apiKey
}
});
eventSource . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
console . log ( `[ ${ data . type } ] ${ data . message || '' } ` );
if ( data . type === 'WORKFLOW_COMPLETED' ) {
console . log ( '最終結果:' , data . result );
eventSource . close ();
}
};
eventSource . onerror = ( error ) => {
console . error ( 'SSEエラー:' , error );
eventSource . close ();
};
return eventSource ;
}
// 使用例
const stream = streamTaskEvents ( 'task_abc123' , 'sk_test_123456' );
// 手動で閉じる
setTimeout (() => stream . close (), 60000 );
JavaScript/Node.js - WebSocket (ws)
import WebSocket from 'ws' ;
function connectWebSocket ( taskId , apiKey ) {
const ws = new WebSocket (
`ws://localhost:8080/api/v1/stream/ws?workflow_id= ${ taskId } ` ,
{ headers: { 'X-API-Key' : apiKey } }
);
ws . on ( 'open' , () => console . log ( '✓ 接続されました' ));
ws . on ( 'message' , ( msg ) => {
const data = JSON . parse ( msg . toString ());
switch ( data . type ) {
case 'AGENT_THINKING' :
console . log ( `💭 ${ data . message } ` );
break ;
case 'TOOL_INVOKED' :
console . log ( `🔧 ツール: ${ data . tool } ` );
break ;
case 'TOOL_OBSERVATION' :
console . log ( `✓ 結果: ${ data . result } ` );
break ;
case 'WORKFLOW_COMPLETED' :
console . log ( `✓ 完了: ${ data . result } ` );
ws . close ();
break ;
default :
console . log ( `[ ${ data . type } ] ${ data . message || '' } ` );
}
});
ws . on ( 'error' , ( err ) => console . error ( '❌ エラー:' , err ));
ws . on ( 'close' , () => console . log ( '接続が閉じられました' ));
return ws ;
}
const ws = connectWebSocket ( 'task_abc123' , 'sk_test_123456' );
Go (SSE)
package main
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"strings"
)
type Event struct {
Type string `json:"type"`
WorkflowID string `json:"workflow_id"`
Message string `json:"message,omitempty"`
Data map [ string ] interface {} `json:",inline"`
}
func streamEvents ( taskID , apiKey string ) error {
url := fmt . Sprintf (
"http://localhost:8080/api/v1/stream/sse?workflow_id= %s " ,
taskID ,
)
req , _ := http . NewRequest ( "GET" , url , nil )
req . Header . Set ( "X-API-Key" , apiKey )
client := & http . Client {}
resp , err := client . Do ( req )
if err != nil {
return err
}
defer resp . Body . Close ()
scanner := bufio . NewScanner ( resp . Body )
for scanner . Scan () {
line := scanner . Text ()
if strings . HasPrefix ( line , "data:" ) {
data := strings . TrimPrefix ( line , "data:" )
var event Event
json . Unmarshal ([] byte ( data ), & event )
fmt . Printf ( "[ %s ] %s \n " , event . Type , event . Message )
if event . Type == "WORKFLOW_COMPLETED" {
fmt . Println ( "✓ タスクが完了しました" )
break
}
}
}
return scanner . Err ()
}
func main () {
err := streamEvents ( "task_abc123" , "sk_test_123456" )
if err != nil {
fmt . Println ( "エラー:" , err )
}
}
Bash/curl (SSE)
#!/bin/bash
API_KEY = "sk_test_123456"
TASK_ID = " $1 "
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id= $TASK_ID " \
-H "X-API-Key: $API_KEY " \
| while IFS = read -r line ; do
if [[ $line == data:* ]]; then
# "data: {...}" から JSON を抽出
JSON = "${ line # data : }"
# 解析して表示
TYPE =$( echo " $JSON " | jq -r '.type' )
MESSAGE =$( echo " $JSON " | jq -r '.message // ""' )
echo "[$( date +%T)] $TYPE : $MESSAGE "
# 完了時に終了
if [[ " $TYPE " == "WORKFLOW_COMPLETED" ]]; then
echo ""
echo " $JSON " | jq -r '.result'
break
fi
fi
done
使用例
1. リアルタイム進捗表示
def display_progress ( task_id : str , api_key : str ):
"""ユーザーにリアルタイムの進捗を表示します。"""
print ( f "タスク { task_id } を開始しています..." )
for event in stream_task_events(task_id, api_key):
event_type = event.get( 'type' )
if event_type == 'AGENT_THINKING' :
print ( f "💭 { event[ 'message' ] } " )
elif event_type == 'TOOL_INVOKED' :
print ( f "🔧 ツールを使用中: { event[ 'tool' ] } " )
elif event_type == 'TOOL_OBSERVATION' :
print ( f "✓ ツールが完了しました" )
elif event_type == 'LLM_PARTIAL' :
print (event[ 'text' ], end = '' , flush = True )
elif event_type == 'WORKFLOW_COMPLETED' :
print ( f " \n\n ✓ 完了しました!" )
return event[ 'result' ]
2. すべてのイベントをファイルにログ
import json
from datetime import datetime
def log_events_to_file ( task_id : str , api_key : str , log_file : str ):
"""すべてのイベントを JSON Lines ファイルにログします。"""
with open (log_file, 'a' ) as f:
for event in stream_task_events(task_id, api_key):
# タイムスタンプを追加
event[ 'logged_at' ] = datetime.now().isoformat()
# JSON Lines として書き込み
f.write(json.dumps(event) + ' \n ' )
f.flush()
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
break
log_events_to_file( "task_abc123" , "sk_test_123456" , "task_events.jsonl" )
3. ツール使用メトリクスの収集
def collect_tool_metrics ( task_id : str , api_key : str ):
"""ツール使用に関するメトリクスを収集します。"""
metrics = {
"tools_invoked" : 0 ,
"tools_succeeded" : 0 ,
"tools_failed" : 0 ,
"tool_list" : []
}
for event in stream_task_events(task_id, api_key):
if event.get( 'type' ) == 'TOOL_INVOKED' :
metrics[ 'tools_invoked' ] += 1
metrics[ 'tool_list' ].append(event[ 'tool' ])
elif event.get( 'type' ) == 'TOOL_OBSERVATION' :
metrics[ 'tools_succeeded' ] += 1
elif event.get( 'type' ) == 'ERROR_OCCURRED' :
metrics[ 'tools_failed' ] += 1
elif event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
break
return metrics
metrics = collect_tool_metrics( "task_abc123" , "sk_test_123456" )
print ( f "使用されたツール: { metrics[ 'tool_list' ] } " )
print ( f "成功率: { metrics[ 'tools_succeeded' ] } / { metrics[ 'tools_invoked' ] } " )
4. React UI 統合
import { useState , useEffect } from 'react' ;
function TaskMonitor ({ taskId , apiKey }) {
const [ events , setEvents ] = useState ([]);
const [ status , setStatus ] = useState ( 'connecting' );
useEffect (() => {
// ブラウザのEventSourceはカスタムヘッダーをサポートしていません。
// api_keyクエリパラメータを使用するか、バックエンドでSSEをプロキシしてください。
const eventSource = new EventSource (
`http://localhost:8080/api/v1/stream/sse?workflow_id= ${ taskId } &api_key= ${ apiKey } `
);
eventSource . onopen = () => setStatus ( 'connected' );
eventSource . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
setEvents ( prev => [... prev , data ]);
if ( data . type === 'WORKFLOW_COMPLETED' ) {
setStatus ( 'completed' );
eventSource . close ();
}
};
eventSource . onerror = () => {
setStatus ( 'error' );
eventSource . close ();
};
return () => eventSource . close ();
}, [ taskId , apiKey ]);
return (
< div >
< h3 > ステータス: { status } </ h3 >
< ul >
{ events . map (( event , i ) => (
< li key = { i } >
< strong > { event . type } </ strong > : { event . message || event . tool }
</ li >
)) }
</ ul >
</ div >
);
}
ディープリサーチストリーミング
ディープリサーチタスクは2〜10分かかります。Task APIを通じてディープリサーチをトリガーするにはcontext.force_researchを使用します:
curl -X POST http://localhost:8080/api/v1/tasks/stream \
-H "Content-Type: application/json" \
-H "X-API-Key: sk_test_123456" \
-d '{
"query": "What are the latest developments in quantum computing?",
"session_id": "session-123",
"context": {"force_research": true}
}'
context.research_strategyでリサーチの深さを制御できます:
戦略 深さ 一般的な所要時間 ユースケース quick低 30秒〜2分 シンプルな事実確認クエリ standard(デフォルト)中 2〜5分 一般的なリサーチ deep高 5〜10分 複雑な多面的トピック academic最高 5〜10分 引用が多い学術リサーチ
ディープリサーチにおけるChat API vs Task API : Chat API(/v1/chat/completionsでmodel: "shannon-deep-research"を指定)でもディープリサーチをトリガーできますが、そのストリーミング形式にはSSEイベントIDが含まれません 。そのため再接続は不可能です。プラットフォームに接続時間制限がある場合(例: Vercelの5分制限)やページリフレッシュに対応する必要がある場合は、Task APIを使用してください 。
ハートビート
サーバーはプロキシやロードバランサーを介した接続を維持するために、10秒ごとに: ping SSEコメントを送信します:
これはSSEコメントであり(JSONメッセージではありません)。pingの受信が停止した場合、接続が切断されています。直ちに再接続してください。
再接続
SSE接続は、ネットワークの問題、プロキシのタイムアウト、またはプラットフォームの制限(例: Vercel hobbyプラン: 5分の接続制限)により切断される可能性があります。Shannonは中断したところからの再開をサポートしています。
仕組み:
受信した各SSEイベントのidフィールドを記録する
切断時にlast_event_idパラメータを付けて再接続する
サーバーはそのID以降のすべてのイベントをリプレイする(約256イベントをバッファ、24時間TTL)
# 中断したところから再接続
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123&last_event_id=1719000002000-0" \
-H "X-API-Key: sk_test_123456"
プロアクティブ再接続(推奨)
接続時間制限のあるプラットフォームでは、制限前にプロアクティブに切断します:
1. SSEに接続する
2. 4分のタイマーを開始する(5分制限前の安全マージン)
3. タイマーが発火したら:
a. 最後に受信したイベントIDを記録する
b. 接続を閉じる
c. last_event_id=<last_id>で直ちに再接続する
4. WORKFLOW_COMPLETEDまたはdoneイベントを受信するまで繰り返す
ページリフレッシュ / フォールバック
ユーザーがページをリフレッシュし、workflow_idがまだある場合:
タスクのステータスを確認: GET /api/v1/tasks/{workflow_id}
TASK_STATUS_RUNNINGの場合 → SSEに再接続
TASK_STATUS_COMPLETEDの場合 → レスポンスから結果を表示
TASK_STATUS_FAILEDの場合 → エラーを表示
Pythonでの再接続
import time
def stream_with_reconnect ( task_id : str , api_key : str , max_retries : int = 5 ):
"""last_event_idを使用した自動再接続付きストリーム。"""
last_id = None
for attempt in range (max_retries):
try :
url = f "http://localhost:8080/api/v1/stream/sse?workflow_id= { task_id } "
if last_id:
url += f "&last_event_id= { last_id } "
with httpx.stream( "GET" , url, headers ={ "X-API-Key" : api_key}, timeout = None ) as response:
for line in response.iter_lines():
if line.startswith( "id:" ):
last_id = line[ 3 :].strip() # Track last event ID
elif line.startswith( "data:" ):
event = json.loads(line[ 5 :])
yield event
if event.get( 'type' ) in ( 'WORKFLOW_COMPLETED' , 'STREAM_END' ):
return
except Exception as e:
if attempt < max_retries - 1 :
wait = min ( 2 ** attempt, 10 )
print ( f "Connection lost, reconnecting in { wait } s (last_id= { last_id } )..." )
time.sleep(wait)
else :
raise
ベストプラクティス
2. タイムアウトの実装
import signal
def stream_with_timeout ( task_id : str , api_key : str , timeout : int = 300 ):
"""タイムアウト付きのストリーム。"""
def timeout_handler ( signum , frame ):
raise TimeoutError ( "ストリームタイムアウト" )
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try :
for event in stream_task_events(task_id, api_key):
yield event
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
break
finally :
signal.alarm( 0 ) # アラームをキャンセル
3. クライアントサイドでのイベントフィルタリング
def stream_specific_events ( task_id : str , api_key : str , event_types : set ):
"""クライアントサイドでイベントをフィルタリングします。"""
for event in stream_task_events(task_id, api_key):
if event.get( 'type' ) in event_types:
yield event
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
yield event
break
# エージェントとツールのイベントのみを受信
for event in stream_specific_events(
"task_abc123" ,
"sk_test_123456" ,
{ "AGENT_THINKING" , "TOOL_INVOKED" , "TOOL_OBSERVATION" }
):
print (event)
4. 最後のイベントから再開
def stream_with_resume ( task_id : str , api_key : str , last_event_id : str = None ):
"""特定のイベントからストリーミングを再開します。"""
url = f "http://localhost:8080/api/v1/stream/sse?workflow_id= { task_id } "
if last_event_id:
url += f "&last_event_id= { last_event_id } "
# イベントをストリーミング...
注意: last_event_id は、Redis ストリーム ID(例: 1700000000000-0)または数値シーケンス(例: 42)のいずれかを受け入れます。数値の場合、リプレイには seq > last_event_id のイベントが含まれます。
比較: SSE vs WebSocket vs ポーリング
機能 SSE WebSocket ポーリング 方向 サーバー → クライアント 双方向 クライアント → サーバー プロトコル HTTP WebSocket HTTP 自動再接続 はい(ブラウザ) いいえ(手動) 該当なし オーバーヘッド 低 非常に低 高 シンプルさ 高 中 高 ユースケース リアルタイム更新 インタラクティブアプリ シンプルなステータス Shannon サポート ✅ 推奨 ✅ 利用可能 ⚠️ 理想的ではない
それぞれの使用時期
SSE : ほとんどのユースケース、リアルタイムモニタリング、進捗表示
WebSocket : インタラクティブアプリケーション、双方向通信が必要
ポーリング (GET /api/v1/tasks/): レガシーシステム、ストリーミングサポートなし
関連エンドポイント
ステータス取得 GET /api/v1/tasks/
Python SDK client.stream() を使用
ノート
イベント保持 :
Redis : すべてのイベントは24時間保存(リアルタイムストリーミング)
PostgreSQL : 重要なイベントは90日間保存(履歴クエリ)
接続が切れた場合は last_event_id を使用してストリーミングを再開
接続制限 :
APIキーごとに最大100の同時ストリーミング接続
5分間の非アクティブタイムアウト(自動接続切断)
接続ごとのバッファサイズ制限は1MB
単一のWebSocket上で複数のワークフローを多重化することを検討