Overview
Shannon provides real-time event streaming through Server-Sent Events (SSE) and WebSocket protocols. Use streaming to monitor task execution, display progress, and receive results as they’re generated.
Authentication : Streaming endpoints require the same headers as other APIs.
Browsers cannot send custom headers with EventSource.
Development: set GATEWAY_SKIP_AUTH=1.
Production: proxy SSE via your backend and inject X-API-Key or Bearer headers.
For SSE endpoints, the api_key query parameter is supported as a fallback (e.g., ?api_key=sk_...). For all other endpoints, use headers exclusively.
Streaming Limits :
Timeout : Streams automatically close after 5 minutes of inactivity
Buffer Size : Maximum 1MB buffered data per connection
Usage Metadata : Token counts and costs now available for all LLM providers (OpenAI, Anthropic, Google, Groq, xAI)
Endpoints
Method Endpoint Protocol Description POST/api/v1/tasks/streamHTTP+SSE Submit task and get stream URL (recommended) GET/api/v1/stream/sseSSE Server-Sent Events endpoint GET/api/v1/stream/wsWebSocket WebSocket streaming endpoint GET/api/v1/tasks/{id}/eventsHTTP Get historical events (paginated)
Unified Submit + Stream (Recommended)
POST /api/v1/tasks/stream
The easiest way to submit a task and immediately start streaming its events. This endpoint combines task submission with streaming setup in one call.
Best for frontend applications : This endpoint is perfect for real-time UIs where you want to show progress immediately after submitting a task.
Authentication
Required : Yes
X-API-Key: sk_test_123456
Or:
Authorization: Bearer YOUR_TOKEN
Request Body
Parameter Type Required Description querystring Yes Natural language task description session_idstring No Session identifier for multi-turn conversations contextobject No Additional context data as key-value pairs model_tierstring No Preferred tier: small, medium, or large model_overridestring No Specific model name (canonical; e.g., gpt-5) provider_overridestring No Force provider (e.g., openai, anthropic, google)
Response
Status : 201 Created
Body :
{
"task_id" : "task_01HQZX3Y9K8M2P4N5S7T9W2V" ,
"workflow_id" : "task_01HQZX3Y9K8M2P4N5S7T9W2V" ,
"stream_url" : "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}
Response Fields
Field Type Description task_idstring Unique task identifier workflow_idstring Task/workflow identifier stream_urlstring Relative URL to SSE stream endpoint
Example: JavaScript/TypeScript
async function submitAndStream ( query , onEvent , onComplete , onError ) {
try {
// 1. Submit task and get stream URL
const response = await fetch ( 'http://localhost:8080/api/v1/tasks/stream' , {
method: 'POST' ,
headers: {
'Content-Type' : 'application/json' ,
'Authorization' : 'Bearer YOUR_TOKEN'
},
body: JSON . stringify ({
query: query ,
session_id: `session- ${ Date . now () } `
})
});
if (! response . ok ) {
throw new Error ( `HTTP ${ response . status } : ${ response . statusText } ` );
}
const { task_id , workflow_id , stream_url } = await response . json ();
console . log ( `Task submitted: ${ workflow_id } ` );
// 2. Connect to SSE stream
const eventSource = new EventSource (
`http://localhost:8080 ${ stream_url } ` ,
{ withCredentials: false }
);
eventSource . onmessage = ( e ) => {
const event = JSON . parse ( e . data );
onEvent ( event );
// Check for completion
if ( event . type === 'WORKFLOW_COMPLETED' ) {
eventSource . close ();
onComplete ( event );
}
};
eventSource . onerror = ( err ) => {
console . error ( 'SSE error:' , err );
eventSource . close ();
onError ( err );
};
return { workflow_id , eventSource };
} catch ( error ) {
onError ( error );
throw error ;
}
}
// Usage
submitAndStream (
"Analyze Q4 revenue trends" ,
( event ) => console . log ( `[ ${ event . type } ]` , event . message ),
( final ) => console . log ( "Completed:" , final . result ),
( error ) => console . error ( "Error:" , error )
);
Example: React Hook
import { useState , useCallback , useRef } from 'react' ;
function useTaskStream ( apiUrl = 'http://localhost:8080' ) {
const [ events , setEvents ] = useState ([]);
const [ isStreaming , setIsStreaming ] = useState ( false );
const [ error , setError ] = useState ( null );
const [ workflowId , setWorkflowId ] = useState ( null );
const eventSourceRef = useRef ( null );
const submitTask = useCallback ( async ( query , sessionId = null ) => {
setEvents ([]);
setError ( null );
setIsStreaming ( true );
try {
// Submit task
const response = await fetch ( ` ${ apiUrl } /api/v1/tasks/stream` , {
method: 'POST' ,
headers: {
'Content-Type' : 'application/json' ,
'Authorization' : `Bearer ${ localStorage . getItem ( 'token' ) } `
},
body: JSON . stringify ({
query ,
session_id: sessionId || `session- ${ Date . now () } `
})
});
if (! response . ok ) throw new Error ( `HTTP ${ response . status } ` );
const { task_id , workflow_id , stream_url } = await response . json ();
setWorkflowId ( workflow_id );
// Connect to stream
const eventSource = new EventSource ( ` ${ apiUrl }${ stream_url } ` );
eventSourceRef . current = eventSource ;
eventSource . onmessage = ( e ) => {
const event = JSON . parse ( e . data );
setEvents ( prev => [... prev , event ]);
if ( event . type === 'WORKFLOW_COMPLETED' || event . type === 'ERROR_OCCURRED' ) {
eventSource . close ();
setIsStreaming ( false );
}
};
eventSource . onerror = ( err ) => {
setError ( err );
setIsStreaming ( false );
eventSource . close ();
};
} catch ( err ) {
setError ( err );
setIsStreaming ( false );
}
}, [ apiUrl ]);
const stopStreaming = useCallback (() => {
if ( eventSourceRef . current ) {
eventSourceRef . current . close ();
setIsStreaming ( false );
}
}, []);
return { submitTask , stopStreaming , events , isStreaming , error , workflowId };
}
// Usage in component
function TaskStreamDemo () {
const { submitTask , stopStreaming , events , isStreaming , error , workflowId } = useTaskStream ();
return (
< div >
< button
onClick = { () => submitTask ( "What is 15 + 25?" ) }
disabled = { isStreaming }
>
{ isStreaming ? 'Processing...' : 'Submit Task' }
</ button >
{ workflowId && < p > Workflow ID: { workflowId } </ p > }
{ error && < p style = { { color: 'red' } } > Error: { error . message } </ p > }
< div >
{ events . map (( event , idx ) => (
< div key = { idx } >
< strong > { event . type } </ strong > : { event . message || event . agent_id }
</ div >
)) }
</ div >
{ isStreaming && < button onClick = { stopStreaming } > Stop </ button > }
</ div >
);
}
Example: Vue 3 Composition API
< script setup >
import { ref } from 'vue' ;
const events = ref ([]);
const isStreaming = ref ( false );
const error = ref ( null );
const workflowId = ref ( null );
let eventSource = null ;
async function submitTask ( query ) {
events . value = [];
error . value = null ;
isStreaming . value = true ;
try {
const response = await fetch ( 'http://localhost:8080/api/v1/tasks/stream' , {
method: 'POST' ,
headers: {
'Content-Type' : 'application/json' ,
'Authorization' : `Bearer ${ localStorage . getItem ( 'token' ) } `
},
body: JSON . stringify ({
query ,
session_id: `session- ${ Date . now () } `
})
});
if (! response . ok ) throw new Error ( `HTTP ${ response . status } ` );
const { task_id , workflow_id , stream_url } = await response . json ();
workflowId . value = workflow_id ;
eventSource = new EventSource ( `http://localhost:8080 ${ stream_url } ` );
eventSource . onmessage = ( e ) => {
const event = JSON . parse ( e . data );
events . value . push ( event );
if ( event . type === 'WORKFLOW_COMPLETED' ) {
eventSource . close ();
isStreaming . value = false ;
}
};
eventSource . onerror = ( err ) => {
error . value = err ;
eventSource . close ();
isStreaming . value = false ;
};
} catch ( err ) {
error . value = err ;
isStreaming . value = false ;
}
}
function stopStreaming () {
if ( eventSource ) {
eventSource . close ();
isStreaming . value = false ;
}
}
</ script >
< template >
< div >
< button @ click = " submitTask ( 'Analyze data' ) " : disabled = " isStreaming " >
{{ isStreaming ? 'Processing...' : 'Submit Task' }}
</ button >
< div v-if = " workflowId " > Workflow: {{ workflowId }} </ div >
< div v-if = " error " style = " color : red " > Error: {{ error . message }} </ div >
< div v-for = " ( event , idx ) in events " : key = " idx " >
< strong > {{ event . type }} </ strong > : {{ event . message || event . agent_id }}
</ div >
< button v-if = " isStreaming " @ click = " stopStreaming " > Stop </ button >
</ div >
</ template >
Example: Python
import httpx
import json
def submit_and_stream ( query : str , api_key : str ):
"""Submit task and stream events."""
# 1. Submit task
response = httpx.post(
"http://localhost:8080/api/v1/tasks/stream" ,
headers ={
"Authorization" : f "Bearer { api_key } " ,
"Content-Type" : "application/json"
},
json ={
"query" : query,
"session_id" : f "session- { int (time.time()) } "
}
)
data = response.json()
workflow_id = data[ "workflow_id" ]
stream_url = data[ "stream_url" ]
print ( f "Task submitted: { workflow_id } " )
# 2. Stream events
with httpx.stream(
"GET" ,
f "http://localhost:8080 { stream_url } " ,
headers ={ "Authorization" : f "Bearer { api_key } " },
timeout = None
) as stream_response:
for line in stream_response.iter_lines():
if line.startswith( "data:" ):
event = json.loads(line[ 5 :])
print ( f "[ { event[ 'type' ] } ] { event.get( 'message' , '' ) } " )
if event[ 'type' ] in [ 'WORKFLOW_COMPLETED' ]:
return event
# Usage
result = submit_and_stream( "What is the capital of France?" , "sk_test_123456" )
print ( "Final result:" , result.get( 'result' ))
Why use this endpoint? The unified endpoint ensures you start streaming immediately after submission, preventing any missed events that could occur if you submit and then connect separately.
Server-Sent Events (SSE)
GET /api/v1/stream/sse
Real-time event streaming using Server-Sent Events.
Authentication
Required : Yes
X-API-Key: sk_test_123456
Query Parameters
Parameter Type Required Description workflow_idstring Yes Task/workflow identifier typesstring No Comma-separated event types to filter last_event_idstring No Resume from specific event ID. Accepts a Redis stream ID (e.g., 1700000000000-0) or a numeric sequence (e.g., 42). When numeric, replay includes events with seq > last_event_id.
Each event follows SSE specification:
id: <event_id>
event: <event_type>
data: <json_payload>
Example Request
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \
-H "X-API-Key: sk_test_123456"
Example Response
id: 1
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","timestamp":"2025-10-22T10:30:00Z","message":"Workflow started"}
id: 2
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","agent_id":"agent_1","message":"Analyzing query...","timestamp":"2025-10-22T10:30:01Z"}
id: 3
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","tool":"web_search","params":{"query":"Python programming"},"timestamp":"2025-10-22T10:30:02Z"}
id: 4
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","tool":"web_search","result":"...","timestamp":"2025-10-22T10:30:05Z"}
id: 5
event: AGENT_COMPLETED
data: {"workflow_id":"task_abc123","agent_id":"agent_1","result":"Python is a high-level programming language...","timestamp":"2025-10-22T10:30:06Z"}
id: 6
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","status":"COMPLETED","result":"...","timestamp":"2025-10-22T10:30:07Z"}
WebSocket
GET /api/v1/stream/ws
Bidirectional streaming via WebSocket.
Authentication
The gateway authenticates WebSocket connections via headers only (X-API-Key or Authorization). Browsers cannot set custom headers during the WebSocket handshake. For browser usage:
Run locally with GATEWAY_SKIP_AUTH=1, or
Use a reverse proxy that injects the header before forwarding to the gateway.
Header-based examples for server environments:
Node (ws):
import WebSocket from 'ws' ;
const ws = new WebSocket ( 'ws://localhost:8080/api/v1/stream/ws' , {
headers: { 'X-API-Key' : 'sk_test_123456' },
});
ws . on ( 'open' , () => {
ws . send ( JSON . stringify ({ type: 'subscribe' , workflow_id: 'task_abc123' }));
});
ws . on ( 'message' , ( msg ) => {
const data = JSON . parse ( msg . toString ());
console . log ( 'Event:' , data . type , data . message );
});
ws . on ( 'error' , ( err ) => console . error ( 'WebSocket error:' , err ));
Python (websockets):
import asyncio, json, websockets
async def main ():
async with websockets.connect(
'ws://localhost:8080/api/v1/stream/ws' ,
extra_headers ={ 'X-API-Key' : 'sk_test_123456' },
) as ws:
await ws.send(json.dumps({ 'type' : 'subscribe' , 'workflow_id' : 'task_abc123' }))
async for message in ws:
data = json.loads(message)
print ( 'Event:' , data.get( 'type' ), data.get( 'message' ))
asyncio.run(main())
Passing the API key in the query string or via an “auth” message after connect is not supported by the gateway.
Message Types
Client → Server :
// Subscribe to workflow
{
"type" : "subscribe" ,
"workflow_id" : "task_abc123" ,
"types" : [ "AGENT_THINKING" , "TOOL_INVOKED" ]
}
// Unsubscribe
{
"type" : "unsubscribe" ,
"workflow_id" : "task_abc123"
}
// Ping (keep-alive)
{
"type" : "ping"
}
Server → Client :
// Event
{
"type" : "AGENT_THINKING" ,
"workflow_id" : "task_abc123" ,
"message" : "Analyzing query..." ,
"timestamp" : "2025-10-22T10:30:00Z"
}
// Pong (keep-alive response)
{
"type" : "pong"
}
OpenAI-Compatible Streaming
Shannon also provides an OpenAI-compatible streaming endpoint at /v1/chat/completions that translates Shannon events into the standard OpenAI chat.completion.chunk format. This allows you to use OpenAI SDKs directly with Shannon.
For complete documentation on the OpenAI-compatible API, including request/response schemas, available models, Shannon-specific extensions (shannon_events), and SDK usage examples, see the OpenAI-Compatible API Reference .
Event Types
Core Events
Event Type Description When Fired WORKFLOW_STARTEDWorkflow execution began Start of task WORKFLOW_COMPLETEDWorkflow finished successfully End of task (success)
Agent Events
Event Type Description Payload Fields AGENT_THINKINGAgent reasoning/planning agent_id, messageAGENT_COMPLETEDAgent finished execution agent_id, result
Event Type Description Payload Fields TOOL_INVOKEDTool execution started tool, paramsTOOL_OBSERVATIONAgent observes tool result tool, result
LLM Events
Event Type Description Payload Fields LLM_PROMPTPrompt sent to LLM textLLM_PARTIALStreaming LLM output textLLM_OUTPUTFinal LLM output text
Progress & System Events
Event Type Description Payload Fields PROGRESSProgress update progress, messageDATA_PROCESSINGData processing in progress messageWAITINGWaiting for resources messageERROR_OCCURREDError occurred error, severityERROR_RECOVERYError recovery attempt messageWORKSPACE_UPDATEDMemory/context updated messageBUDGET_THRESHOLDBudget warning threshold reached usage_percent, threshold_percent, tokens_used, tokens_budget, level, budget_type
Stream Lifecycle Events
Event Type Description STREAM_ENDExplicit end-of-stream signal (no more events will be emitted for this workflow)
Over SSE, the STREAM_END lifecycle event is delivered as an SSE event named done with data: [DONE] (plain text, not JSON). Over WebSocket, it appears as a normal JSON event with "type": "STREAM_END".
Team & Approvals
Event Type Description TEAM_RECRUITEDMulti-agent team assembled TEAM_RETIREDTeam disbanded TEAM_STATUSTeam coordination update ROLE_ASSIGNEDAgent role assigned DELEGATIONTask delegated DEPENDENCY_SATISFIEDDependency resolved APPROVAL_REQUESTEDHuman approval needed APPROVAL_DECISIONApproval decision recorded
Code Examples
Python with httpx (SSE)
import httpx
import json
def stream_task_events ( task_id : str , api_key : str ):
"""Stream task events using SSE."""
url = f "http://localhost:8080/api/v1/stream/sse?workflow_id= { task_id } "
with httpx.stream(
"GET" ,
url,
headers ={ "X-API-Key" : api_key},
timeout = None # No timeout for streaming
) as response:
for line in response.iter_lines():
if line.startswith( "data:" ):
data = json.loads(line[ 5 :]) # Remove "data:" prefix
yield data
# Usage
for event in stream_task_events( "task_abc123" , "sk_test_123456" ):
print ( f "[ { event.get( 'type' ) } ] { event.get( 'message' , '' ) } " )
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
print ( "Final result:" , event.get( 'result' ))
break
Python - Stream with Event Filtering
def stream_filtered_events ( task_id : str , api_key : str , event_types : list ):
"""Stream only specific event types."""
types_param = "," .join(event_types)
url = f "http://localhost:8080/api/v1/stream/sse?workflow_id= { task_id } &types= { types_param } "
with httpx.stream( "GET" , url, headers ={ "X-API-Key" : api_key}) as response:
for line in response.iter_lines():
if line.startswith( "data:" ):
yield json.loads(line[ 5 :])
# Only receive agent thinking and tool events
for event in stream_filtered_events(
"task_abc123" ,
"sk_test_123456" ,
[ "AGENT_THINKING" , "TOOL_INVOKED" , "TOOL_OBSERVATION" ]
):
print ( f " { event[ 'type' ] } : { event.get( 'message' , event.get( 'tool' )) } " )
JavaScript/Node.js (SSE)
const EventSource = require ( 'eventsource' );
function streamTaskEvents ( taskId , apiKey ) {
const url = `http://localhost:8080/api/v1/stream/sse?workflow_id= ${ taskId } ` ;
const eventSource = new EventSource ( url , {
headers: {
'X-API-Key' : apiKey
}
});
eventSource . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
console . log ( `[ ${ data . type } ] ${ data . message || '' } ` );
if ( data . type === 'WORKFLOW_COMPLETED' ) {
console . log ( 'Final result:' , data . result );
eventSource . close ();
}
};
eventSource . onerror = ( error ) => {
console . error ( 'SSE error:' , error );
eventSource . close ();
};
return eventSource ;
}
// Usage
const stream = streamTaskEvents ( 'task_abc123' , 'sk_test_123456' );
// Close manually
setTimeout (() => stream . close (), 60000 );
JavaScript/Node.js - WebSocket (ws)
import WebSocket from 'ws' ;
function connectWebSocket ( taskId , apiKey ) {
const ws = new WebSocket (
`ws://localhost:8080/api/v1/stream/ws?workflow_id= ${ taskId } ` ,
{ headers: { 'X-API-Key' : apiKey } }
);
ws . on ( 'open' , () => console . log ( '✓ Connected' ));
ws . on ( 'message' , ( msg ) => {
const data = JSON . parse ( msg . toString ());
switch ( data . type ) {
case 'AGENT_THINKING' :
console . log ( `💭 ${ data . message } ` );
break ;
case 'TOOL_INVOKED' :
console . log ( `🔧 Tool: ${ data . tool } ` );
break ;
case 'TOOL_OBSERVATION' :
console . log ( `✓ Result: ${ data . result } ` );
break ;
case 'WORKFLOW_COMPLETED' :
console . log ( `✓ Done: ${ data . result } ` );
ws . close ();
break ;
default :
console . log ( `[ ${ data . type } ] ${ data . message || '' } ` );
}
});
ws . on ( 'error' , ( err ) => console . error ( '❌ Error:' , err ));
ws . on ( 'close' , () => console . log ( 'Connection closed' ));
return ws ;
}
const ws = connectWebSocket ( 'task_abc123' , 'sk_test_123456' );
Go (SSE)
package main
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"strings"
)
type Event struct {
Type string `json:"type"`
WorkflowID string `json:"workflow_id"`
Message string `json:"message,omitempty"`
Data map [ string ] interface {} `json:",inline"`
}
func streamEvents ( taskID , apiKey string ) error {
url := fmt . Sprintf (
"http://localhost:8080/api/v1/stream/sse?workflow_id= %s " ,
taskID ,
)
req , _ := http . NewRequest ( "GET" , url , nil )
req . Header . Set ( "X-API-Key" , apiKey )
client := & http . Client {}
resp , err := client . Do ( req )
if err != nil {
return err
}
defer resp . Body . Close ()
scanner := bufio . NewScanner ( resp . Body )
for scanner . Scan () {
line := scanner . Text ()
if strings . HasPrefix ( line , "data:" ) {
data := strings . TrimPrefix ( line , "data:" )
var event Event
json . Unmarshal ([] byte ( data ), & event )
fmt . Printf ( "[ %s ] %s \n " , event . Type , event . Message )
if event . Type == "WORKFLOW_COMPLETED" {
fmt . Println ( "✓ Task completed" )
break
}
}
}
return scanner . Err ()
}
func main () {
err := streamEvents ( "task_abc123" , "sk_test_123456" )
if err != nil {
fmt . Println ( "Error:" , err )
}
}
Bash/curl (SSE)
#!/bin/bash
API_KEY = "sk_test_123456"
TASK_ID = " $1 "
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id= $TASK_ID " \
-H "X-API-Key: $API_KEY " \
| while IFS = read -r line ; do
if [[ $line == data:* ]]; then
# Extract JSON from "data: {...}"
JSON = "${ line # data : }"
# Parse and display
TYPE =$( echo " $JSON " | jq -r '.type' )
MESSAGE =$( echo " $JSON " | jq -r '.message // ""' )
echo "[$( date +%T)] $TYPE : $MESSAGE "
# Exit on completion
if [[ " $TYPE " == "WORKFLOW_COMPLETED" ]]; then
echo ""
echo " $JSON " | jq -r '.result'
break
fi
fi
done
Use Cases
1. Real-Time Progress Display
def display_progress ( task_id : str , api_key : str ):
"""Display real-time progress to user."""
print ( f "Task { task_id } starting..." )
for event in stream_task_events(task_id, api_key):
event_type = event.get( 'type' )
if event_type == 'AGENT_THINKING' :
print ( f "💭 { event[ 'message' ] } " )
elif event_type == 'TOOL_INVOKED' :
print ( f "🔧 Using tool: { event[ 'tool' ] } " )
elif event_type == 'TOOL_OBSERVATION' :
print ( f "✓ Tool completed" )
elif event_type == 'LLM_PARTIAL' :
print (event[ 'text' ], end = '' , flush = True )
elif event_type == 'WORKFLOW_COMPLETED' :
print ( f " \n\n ✓ Done!" )
return event[ 'result' ]
2. Log All Events to File
import json
from datetime import datetime
def log_events_to_file ( task_id : str , api_key : str , log_file : str ):
"""Log all events to JSON Lines file."""
with open (log_file, 'a' ) as f:
for event in stream_task_events(task_id, api_key):
# Add timestamp
event[ 'logged_at' ] = datetime.now().isoformat()
# Write as JSON Lines
f.write(json.dumps(event) + ' \n ' )
f.flush()
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
break
log_events_to_file( "task_abc123" , "sk_test_123456" , "task_events.jsonl" )
def collect_tool_metrics ( task_id : str , api_key : str ):
"""Collect metrics about tool usage."""
metrics = {
"tools_invoked" : 0 ,
"tools_succeeded" : 0 ,
"tools_failed" : 0 ,
"tool_list" : []
}
for event in stream_task_events(task_id, api_key):
if event.get( 'type' ) == 'TOOL_INVOKED' :
metrics[ 'tools_invoked' ] += 1
metrics[ 'tool_list' ].append(event[ 'tool' ])
elif event.get( 'type' ) == 'TOOL_OBSERVATION' :
metrics[ 'tools_succeeded' ] += 1
elif event.get( 'type' ) == 'ERROR_OCCURRED' :
metrics[ 'tools_failed' ] += 1
elif event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
break
return metrics
metrics = collect_tool_metrics( "task_abc123" , "sk_test_123456" )
print ( f "Tools used: { metrics[ 'tool_list' ] } " )
print ( f "Success rate: { metrics[ 'tools_succeeded' ] } / { metrics[ 'tools_invoked' ] } " )
4. React UI Integration
import { useState , useEffect } from 'react' ;
function TaskMonitor ({ taskId , apiKey }) {
const [ events , setEvents ] = useState ([]);
const [ status , setStatus ] = useState ( 'connecting' );
useEffect (() => {
const eventSource = new EventSource (
`http://localhost:8080/api/v1/stream/sse?workflow_id= ${ taskId } ` ,
{ headers: { 'X-API-Key' : apiKey } }
);
eventSource . onopen = () => setStatus ( 'connected' );
eventSource . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
setEvents ( prev => [... prev , data ]);
if ( data . type === 'WORKFLOW_COMPLETED' ) {
setStatus ( 'completed' );
eventSource . close ();
}
};
eventSource . onerror = () => {
setStatus ( 'error' );
eventSource . close ();
};
return () => eventSource . close ();
}, [ taskId , apiKey ]);
return (
< div >
< h3 > Status: { status } </ h3 >
< ul >
{ events . map (( event , i ) => (
< li key = { i } >
< strong > { event . type } </ strong > : { event . message || event . tool }
</ li >
)) }
</ ul >
</ div >
);
}
Best Practices
1. Handle Connection Errors
import time
def stream_with_retry ( task_id : str , api_key : str , max_retries : int = 3 ):
"""Stream with automatic retry on connection failure."""
for attempt in range (max_retries):
try :
for event in stream_task_events(task_id, api_key):
yield event
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
return
except Exception as e:
if attempt < max_retries - 1 :
print ( f "Connection failed, retrying in { 2 ** attempt } s..." )
time.sleep( 2 ** attempt)
else :
raise
2. Implement Timeout
import signal
def stream_with_timeout ( task_id : str , api_key : str , timeout : int = 300 ):
"""Stream with timeout."""
def timeout_handler ( signum , frame ):
raise TimeoutError ( "Stream timeout" )
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try :
for event in stream_task_events(task_id, api_key):
yield event
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
break
finally :
signal.alarm( 0 ) # Cancel alarm
3. Filter Events Client-Side
def stream_specific_events ( task_id : str , api_key : str , event_types : set ):
"""Filter events client-side."""
for event in stream_task_events(task_id, api_key):
if event.get( 'type' ) in event_types:
yield event
if event.get( 'type' ) == 'WORKFLOW_COMPLETED' :
yield event
break
# Only receive agent and tool events
for event in stream_specific_events(
"task_abc123" ,
"sk_test_123456" ,
{ "AGENT_THINKING" , "TOOL_INVOKED" , "TOOL_OBSERVATION" }
):
print (event)
4. Resume from Last Event
def stream_with_resume ( task_id : str , api_key : str , last_event_id : str = None ):
"""Resume streaming from specific event."""
url = f "http://localhost:8080/api/v1/stream/sse?workflow_id= { task_id } "
if last_event_id:
url += f "&last_event_id= { last_event_id } "
# Stream events...
Note: last_event_id accepts either a Redis stream ID (e.g., 1700000000000-0) or a numeric sequence (e.g., 42). When numeric, replay includes events with seq > last_event_id.
Comparison: SSE vs WebSocket vs Polling
Feature SSE WebSocket Polling Direction Server → Client Bidirectional Client → Server Protocol HTTP WebSocket HTTP Auto-reconnect Yes (browser) No (manual) N/A Overhead Low Very Low High Simplicity High Medium High Use Case Real-time updates Interactive apps Simple status Shannon Support ✅ Recommended ✅ Available ⚠️ Not ideal
When to Use Each
SSE : Most use cases, real-time monitoring, progress display
WebSocket : Interactive applications, bidirectional communication needed
Polling (GET /api/v1/tasks/): Legacy systems, no streaming support
Submit Task POST /api/v1/tasks
Get Status GET /api/v1/tasks/
Python SDK Use client.stream()
Notes
Event Retention :
Redis : All events stored for 24 hours (real-time streaming)
PostgreSQL : Critical events stored for 90 days (historical queries)
Use last_event_id to resume streaming if connection drops
Connection Limits :
Maximum 100 concurrent streaming connections per API key
5-minute inactivity timeout (automatic connection close)
1MB buffer size limit per connection
Consider multiplexing multiple workflows over a single WebSocket