Overview
Shannon provides real-time event streaming so you can monitor task execution as it happens. This is essential for:
Providing live feedback to users
Debugging workflow execution
Building interactive UIs
Monitoring long-running tasks
Streaming Technologies
Shannon supports two streaming protocols:
Protocol Use Case Features SSE (Server-Sent Events)One-way server→client Simple, HTTP-based, auto-reconnect WebSocket Bidirectional Full duplex, lower latency
SSE is recommended for most use cases due to its simplicity and built-in reconnection handling.
Server-Sent Events (SSE)
Using cURL
curl -N http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
Output:
event: WORKFLOW_STARTED
data: {"workflow_id":"task-123","agent_id":"orchestrator","message":"Task processing started"}
event: DATA_PROCESSING
data: {"workflow_id":"task-123","message":"Preparing context"}
event: PROGRESS
data: {"workflow_id":"task-123","agent_id":"planner","message":"Created a plan with 3 steps"}
event: AGENT_STARTED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Processing query"}
event: LLM_PROMPT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Searching for information..."}
event: TOOL_INVOKED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Calling web_search with query='topic A'"}
event: TOOL_OBSERVATION
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Found 5 results"}
event: LLM_OUTPUT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Research complete: Found relevant information"}
event: AGENT_COMPLETED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Task done"}
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task-123","message":"Final synthesized result ready"}
Using Python SDK
from shannon import ShannonClient
client = ShannonClient( base_url = "http://localhost:8080" )
# Submit task
handle = client.submit_task(
query = "Research AI trends and create summary"
)
# Stream events
for event in client.stream(handle.workflow_id):
print ( f "[ { event.type } ] { event.message } " )
if event.type == "WORKFLOW_COMPLETED" :
print ( f "Task completed: { event.message } " )
break
Event Types
Shannon emits 33 event types across different categories. Here are the most commonly used ones:
Core Workflow Events
Event Type Description Example Message WORKFLOW_STARTEDTask processing started "Task processing started"WORKFLOW_COMPLETEDWorkflow finished successfully "All done"AGENT_STARTEDAgent began processing "Processing query"AGENT_COMPLETEDAgent finished "Task done"STATUS_UPDATEStatus update/progress "Planning next step"ERROR_OCCURREDError during execution "Failed to connect: timeout"
Event Type Description Example Message LLM_PROMPTPrompt sent to LLM "What is 5 + 5?"LLM_OUTPUTComplete LLM response "5 + 5 equals 10"LLM_PARTIALStreaming chunk (often filtered) "5 + 5"TOOL_INVOKEDTool execution started "Calling database_query"TOOL_OBSERVATIONTool result/output "Query returned 42 rows"
Progress & Status Events
Event Type Description Example Message PROGRESSStep completion update "Created a plan with 3 steps"DATA_PROCESSINGProcessing/analyzing data "Preparing context"DELEGATIONTask delegated to another agent "Handing off to simple task"
Multi-Agent Events (requires feature gates)
Event Type Description Example Message MESSAGE_SENTAgent sent message (requires p2p_v1) "Please analyze section 3"MESSAGE_RECEIVEDAgent received message (requires p2p_v1) "Received task"TEAM_RECRUITEDAgent recruited (requires dynamic_team_v1) "Summarize section 3"TEAM_RETIREDAgent retired (requires dynamic_team_v1) "Task completed"BUDGET_THRESHOLDToken budget threshold reached "Budget threshold: 80%"
WebSocket Streaming
Connect via WebSocket
import asyncio
import websockets
import json
async def stream_task ():
uri = f "ws://localhost:8080/api/v1/stream/ws?workflow_id= { workflow_id } "
# Pass API key via headers (gateway requires header-based auth)
async with websockets.connect(
uri, extra_headers = { 'X-API-Key' : 'sk_test_123456' }
) as websocket:
while True :
message = await websocket.recv()
event = json.loads(message)
print ( f "Event: { event[ 'type' ] } " )
if event[ 'type' ] == 'WORKFLOW_COMPLETED' :
break
asyncio.run(stream_task())
WebSocket streaming is currently server-to-client only. Use the REST API /api/v1/tasks/{id}/cancel to cancel tasks.
Filtering Events
Filter events by type to reduce noise:
# Only show important events
for event in client.stream(workflow_id):
if event.type in [ 'PROGRESS' , 'AGENT_COMPLETED' , 'WORKFLOW_COMPLETED' ]:
print ( f " { event.type } : { event.message } " )
Progress Tracking
Calculate task progress from events:
def track_progress ( workflow_id ):
agents_started = 0
agents_completed = 0
for event in client.stream(workflow_id):
if event.type == 'PROGRESS' :
# Track progress updates from planner
print ( f "Progress: { event.message } " )
elif event.type == 'AGENT_STARTED' :
agents_started += 1
print ( f "Agent started: { event.agent_id } " )
elif event.type == 'AGENT_COMPLETED' :
agents_completed += 1
if agents_started > 0 :
progress = (agents_completed / agents_started) * 100
print ( f "Progress: { progress :.1f} % ( { agents_completed } / { agents_started } )" )
elif event.type == 'WORKFLOW_COMPLETED' :
print ( "✅ Task complete!" )
break
track_progress(handle.workflow_id)
Output:
Progress: Created a plan with 3 steps
Agent started: research-agent
Agent started: analysis-agent
Agent started: writer-agent
Progress: 33.3% (1/3)
Progress: 66.7% (2/3)
Progress: 100.0% (3/3)
✅ Task complete!
Real-Time UI Example
React Component
import { useEffect , useState } from 'react' ;
function TaskMonitor ({ taskId , apiKey }) {
const [ events , setEvents ] = useState ([]);
const [ progress , setProgress ] = useState ( 0 );
useEffect (() => {
const params = new URLSearchParams ({ workflow_id: taskId });
// Note: Browser EventSource doesn't support custom headers.
// Production: initiate SSE from a backend that injects auth headers,
// or use GATEWAY_SKIP_AUTH=1 in development.
const eventSource = new EventSource (
`http://localhost:8080/api/v1/stream/sse? ${ params } `
);
const handleEvent = ( e ) => {
const event = JSON . parse ( e . data );
setEvents ( prev => [ ... prev , event ]);
// Update progress based on workflow lifecycle
if ( event . type === 'WORKFLOW_STARTED' ) {
setProgress ( 10 );
} else if ( event . type === 'PROGRESS' ) {
setProgress ( prev => Math . min ( prev + 20 , 80 ));
} else if ( event . type === 'WORKFLOW_COMPLETED' ) {
setProgress ( 100 );
}
};
// Listen to specific event types (Shannon uses named SSE events)
const eventTypes = [
'WORKFLOW_STARTED' ,
'WORKFLOW_COMPLETED' ,
'AGENT_STARTED' ,
'AGENT_COMPLETED' ,
'STATUS_UPDATE' ,
'PROGRESS' ,
'ERROR_OCCURRED' ,
'LLM_OUTPUT' ,
'TOOL_INVOKED' ,
'TOOL_OBSERVATION'
];
eventTypes . forEach ( type => {
eventSource . addEventListener ( type , handleEvent );
});
// Fallback for unnamed events
eventSource . onmessage = handleEvent ;
eventSource . onerror = () => {
console . error ( 'SSE connection error' );
};
return () => {
eventTypes . forEach ( type => {
eventSource . removeEventListener ( type , handleEvent );
});
eventSource . close ();
};
}, [ taskId , apiKey ]);
return (
< div >
< progress value = { progress } max = { 100 } />
< ul >
{ events . map (( e , i ) => (
< li key = { i } >
< strong > [ { e . type } ] </ strong > { ' ' }
{ e . agent_id && < span > ( { e . agent_id } ) </ span > }{ ' ' }
{ e . message }
</ li >
)) }
</ ul >
</ div >
);
}
Key Implementation Details:
Shannon emits named SSE events (e.g., event: AGENT_STARTED), so you must use addEventListener() for each event type
Browser EventSource API doesn’t support custom headers. Do not pass API keys via query params; use a backend proxy to inject headers or disable auth in development
Always clean up event listeners in the cleanup function to prevent memory leaks
Error Handling
Handle connection failures gracefully:
from shannon import ShannonClient
from shannon.errors import ShannonError, ConnectionError
import time
def robust_streaming ( workflow_id , max_retries = 3 ):
client = ShannonClient( base_url = "http://localhost:8080" )
for attempt in range (max_retries):
try :
for event in client.stream(workflow_id):
print ( f "Event: { event.type } " )
if event.type == 'WORKFLOW_COMPLETED' :
# Get final status to retrieve result
status = client.get_status(workflow_id.replace( 'wf-' , 'task-' ))
return status.result
except (ShannonError, ConnectionError ) as e:
print ( f "Stream error (attempt { attempt + 1 } ): { e } " )
if attempt < max_retries - 1 :
time.sleep( 2 ** attempt) # Exponential backoff
else :
raise
Authentication with Streaming
When authentication is enabled, provide API key:
SSE with API Key
curl -N \
-H "X-API-Key: sk_test_123456" \
http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
Python SDK with API Key
client = ShannonClient(
base_url = "http://localhost:8080" ,
api_key = "sk_test_123456"
)
# API key automatically included in all requests
for event in client.stream(workflow_id):
print (event)
Event Buffering
Shannon buffers events to prevent overwhelming clients:
# config/shannon.yaml
streaming :
buffer_size : 100 # Maximum queued events
flush_interval_ms : 100 # Send batches every 100ms
Keepalive
SSE sends keepalive comments every 15 seconds to prevent timeout:
: keepalive
: keepalive
event: LLM_PROMPT
data: {"message":"..."}
Resource Cleanup
Always close streams when done:
# Iterate through events (automatically handles connection)
for event in client.stream(workflow_id):
if event.type == 'WORKFLOW_COMPLETED' :
break
# No explicit close needed
Multi-Task Monitoring
Monitor multiple tasks simultaneously:
import asyncio
async def monitor_task ( client , workflow_id ):
# Use sync stream() in async context
for event in client.stream(workflow_id):
print ( f "[ { workflow_id } ] { event.type } " )
if event.type == 'WORKFLOW_COMPLETED' :
# Get final result from status
task_id = workflow_id.replace( 'wf-' , 'task-' )
status = client.get_status(task_id)
return status.result
async def monitor_all ( task_ids ):
client = AsyncShannonClient( base_url = "http://localhost:8080" )
# Monitor all tasks in parallel
results = await asyncio.gather( * [
monitor_task(client, tid) for tid in task_ids
])
return results
# Run
task_ids = [ "task-1" , "task-2" , "task-3" ]
results = asyncio.run(monitor_all(task_ids))
Desktop Application
Shannon provides a native desktop application built with Tauri for real-time task monitoring and management. The desktop app offers:
Live event streaming for active tasks
Task submission and management interface
Real-time metrics and visualizations
Session history browsing
The desktop application has replaced the previous web dashboard. See the Desktop App Guide for installation and usage instructions.
Best Practices
1. Use SSE for Simple Monitoring
# ✅ Good: Simple SSE streaming
for event in client.stream(workflow_id):
print (event.type)
2. Handle Disconnections
# ✅ Good: Retry logic
for attempt in range ( 3 ):
try :
for event in client.stream(workflow_id):
process_event(event)
break
except (ShannonError, ConnectionError ):
if attempt == 2 :
raise
time.sleep( 2 )
3. Filter Unnecessary Events
# ✅ Good: Only critical events
critical_events = [ 'PROGRESS' , 'WORKFLOW_COMPLETED' , 'ERROR_OCCURRED' ]
for event in client.stream(workflow_id):
if event.type in critical_events:
handle_event(event)
4. Set Timeouts
# ✅ Good: Timeout protection
import signal
def timeout_handler ( signum , frame ):
raise TimeoutError ( "Stream timeout" )
signal.signal(signal. SIGALRM , timeout_handler)
signal.alarm( 300 ) # 5 minute timeout
try :
for event in client.stream(workflow_id):
print (event)
finally :
signal.alarm( 0 ) # Cancel alarm
Next Steps
Python SDK Async streaming with SDK
API Reference Streaming API details
Monitoring Task monitoring guide
WebSocket API WebSocket documentation