> ## Documentation Index
> Fetch the complete documentation index at: https://docs.shannon.run/llms.txt
> Use this file to discover all available pages before exploring further.

# Real-Time Streaming

> Monitor task execution with Server-Sent Events and WebSockets

## Overview

Shannon provides real-time event streaming so you can monitor task execution as it happens. This is essential for:

* Providing live feedback to users
* Debugging workflow execution
* Building interactive UIs
* Monitoring long-running tasks

## Streaming Technologies

Shannon supports two streaming protocols:

| Protocol                     | Use Case              | Features                           |
| ---------------------------- | --------------------- | ---------------------------------- |
| **SSE** (Server-Sent Events) | One-way server→client | Simple, HTTP-based, auto-reconnect |
| **WebSocket**                | Bidirectional         | Full duplex, lower latency         |

<Note>
  SSE is recommended for most use cases due to its simplicity and built-in reconnection handling.
</Note>

## Server-Sent Events (SSE)

### Using cURL

```bash theme={null}
curl -N http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
```

Output:

```
event: WORKFLOW_STARTED
data: {"workflow_id":"task-123","agent_id":"orchestrator","message":"Task processing started"}

event: DATA_PROCESSING
data: {"workflow_id":"task-123","message":"Preparing context"}

event: PROGRESS
data: {"workflow_id":"task-123","agent_id":"planner","message":"Created a plan with 3 steps"}

event: AGENT_STARTED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Processing query"}

event: LLM_PROMPT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Searching for information..."}

event: TOOL_INVOKED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Calling web_search with query='topic A'"}

event: TOOL_OBSERVATION
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Found 5 results"}

event: LLM_OUTPUT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Research complete: Found relevant information"}

event: AGENT_COMPLETED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"Task done"}

event: WORKFLOW_COMPLETED
data: {"workflow_id":"task-123","message":"Final synthesized result ready"}
```

### Using Python SDK

```python theme={null}
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Submit task
handle = client.submit_task(
    query="Research AI trends and create summary"
)

# Stream events
for event in client.stream(handle.workflow_id):
    print(f"[{event.type}] {event.message}")

    if event.type == "WORKFLOW_COMPLETED":
        print(f"Task completed: {event.message}")
        break
```

### Event Types

Shannon emits 33 event types across different categories. Here are the most commonly used ones:

#### Core Workflow Events

| Event Type           | Description                    | Example Message                |
| -------------------- | ------------------------------ | ------------------------------ |
| `WORKFLOW_STARTED`   | Task processing started        | `"Task processing started"`    |
| `WORKFLOW_COMPLETED` | Workflow finished successfully | `"All done"`                   |
| `AGENT_STARTED`      | Agent began processing         | `"Processing query"`           |
| `AGENT_COMPLETED`    | Agent finished                 | `"Task done"`                  |
| `STATUS_UPDATE`      | Status update/progress         | `"Planning next step"`         |
| `ERROR_OCCURRED`     | Error during execution         | `"Failed to connect: timeout"` |

#### LLM & Tool Events

| Event Type         | Description                      | Example Message            |
| ------------------ | -------------------------------- | -------------------------- |
| `LLM_PROMPT`       | Prompt sent to LLM               | `"What is 5 + 5?"`         |
| `LLM_OUTPUT`       | Complete LLM response            | `"5 + 5 equals 10"`        |
| `LLM_PARTIAL`      | Streaming chunk (often filtered) | `"5 + 5"`                  |
| `TOOL_INVOKED`     | Tool execution started           | `"Calling database_query"` |
| `TOOL_OBSERVATION` | Tool result/output               | `"Query returned 42 rows"` |

#### Progress & Status Events

| Event Type        | Description                     | Example Message                 |
| ----------------- | ------------------------------- | ------------------------------- |
| `PROGRESS`        | Step completion update          | `"Created a plan with 3 steps"` |
| `DATA_PROCESSING` | Processing/analyzing data       | `"Preparing context"`           |
| `DELEGATION`      | Task delegated to another agent | `"Handing off to simple task"`  |

#### Multi-Agent Events (requires feature gates)

| Event Type         | Description                                  | Example Message              |
| ------------------ | -------------------------------------------- | ---------------------------- |
| `MESSAGE_SENT`     | Agent sent message (requires `p2p_v1`)       | `"Please analyze section 3"` |
| `MESSAGE_RECEIVED` | Agent received message (requires `p2p_v1`)   | `"Received task"`            |
| `TEAM_RECRUITED`   | Agent recruited (requires `dynamic_team_v1`) | `"Summarize section 3"`      |
| `TEAM_RETIRED`     | Agent retired (requires `dynamic_team_v1`)   | `"Task completed"`           |
| `BUDGET_THRESHOLD` | Token budget threshold reached               | `"Budget threshold: 80%"`    |

<Note>
  For a complete list of all 27 event types, see the [Event Types Reference](/en/api/event-types).
</Note>

## WebSocket Streaming

### Connect via WebSocket

```python theme={null}
import asyncio
import websockets
import json

async def stream_task():
    uri = f"ws://localhost:8080/api/v1/stream/ws?workflow_id={workflow_id}"

    # Pass API key via headers (gateway requires header-based auth)
    async with websockets.connect(
        uri, extra_headers={'X-API-Key': 'sk_test_123456'}
    ) as websocket:
        while True:
            message = await websocket.recv()
            event = json.loads(message)
            print(f"Event: {event['type']}")

            if event['type'] == 'WORKFLOW_COMPLETED':
                break

asyncio.run(stream_task())
```

<Note>
  WebSocket streaming is currently server-to-client only. Use the REST API `/api/v1/tasks/{id}/cancel` to cancel tasks.
</Note>

## Filtering Events

Filter events by type to reduce noise:

```python theme={null}
# Only show important events
for event in client.stream(workflow_id):
    if event.type in ['PROGRESS', 'AGENT_COMPLETED', 'WORKFLOW_COMPLETED']:
        print(f"{event.type}: {event.message}")
```

## Progress Tracking

Calculate task progress from events:

```python theme={null}
def track_progress(workflow_id):
    agents_started = 0
    agents_completed = 0

    for event in client.stream(workflow_id):
        if event.type == 'PROGRESS':
            # Track progress updates from planner
            print(f"Progress: {event.message}")

        elif event.type == 'AGENT_STARTED':
            agents_started += 1
            print(f"Agent started: {event.agent_id}")

        elif event.type == 'AGENT_COMPLETED':
            agents_completed += 1
            if agents_started > 0:
                progress = (agents_completed / agents_started) * 100
                print(f"Progress: {progress:.1f}% ({agents_completed}/{agents_started})")

        elif event.type == 'WORKFLOW_COMPLETED':
            print("✅ Task complete!")
            break

track_progress(handle.workflow_id)
```

Output:

```
Progress: Created a plan with 3 steps
Agent started: research-agent
Agent started: analysis-agent
Agent started: writer-agent
Progress: 33.3% (1/3)
Progress: 66.7% (2/3)
Progress: 100.0% (3/3)
✅ Task complete!
```

## Real-Time UI Example

### React Component

```jsx theme={null}
import { useEffect, useState } from 'react';

function TaskMonitor({ taskId, apiKey }) {
  const [events, setEvents] = useState([]);
  const [progress, setProgress] = useState(0);

  useEffect(() => {
    const params = new URLSearchParams({ workflow_id: taskId });

    // Note: Browser EventSource doesn't support custom headers.
    // Production: initiate SSE from a backend that injects auth headers,
    // or use GATEWAY_SKIP_AUTH=1 in development.

    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/stream/sse?${params}`
    );

    const handleEvent = (e) => {
      const event = JSON.parse(e.data);
      setEvents(prev => [...prev, event]);

      // Update progress based on workflow lifecycle
      if (event.type === 'WORKFLOW_STARTED') {
        setProgress(10);
      } else if (event.type === 'PROGRESS') {
        setProgress(prev => Math.min(prev + 20, 80));
      } else if (event.type === 'WORKFLOW_COMPLETED') {
        setProgress(100);
      }
    };

    // Listen to specific event types (Shannon uses named SSE events)
    const eventTypes = [
      'WORKFLOW_STARTED',
      'WORKFLOW_COMPLETED',
      'AGENT_STARTED',
      'AGENT_COMPLETED',
      'STATUS_UPDATE',
      'PROGRESS',
      'ERROR_OCCURRED',
      'LLM_OUTPUT',
      'TOOL_INVOKED',
      'TOOL_OBSERVATION'
    ];

    eventTypes.forEach(type => {
      eventSource.addEventListener(type, handleEvent);
    });

    // Fallback for unnamed events
    eventSource.onmessage = handleEvent;

    eventSource.onerror = () => {
      console.error('SSE connection error');
    };

    return () => {
      eventTypes.forEach(type => {
        eventSource.removeEventListener(type, handleEvent);
      });
      eventSource.close();
    };
  }, [taskId, apiKey]);

  return (
    <div>
      <progress value={progress} max={100} />
      <ul>
        {events.map((e, i) => (
          <li key={i}>
            <strong>[{e.type}]</strong>{' '}
            {e.agent_id && <span>({e.agent_id})</span>}{' '}
            {e.message}
          </li>
        ))}
      </ul>
    </div>
  );
}
```

<Note>
  **Key Implementation Details:**

  * Shannon emits **named SSE events** (e.g., `event: AGENT_STARTED`), so you must use `addEventListener()` for each event type
  * Browser `EventSource` API doesn't support custom headers. Do not pass API keys via query params; use a backend proxy to inject headers or disable auth in development
  * Always clean up event listeners in the cleanup function to prevent memory leaks
</Note>

## Error Handling

Handle connection failures gracefully:

```python theme={null}
from shannon import ShannonClient
from shannon.errors import ShannonError, ConnectionError
import time

def robust_streaming(workflow_id, max_retries=3):
    client = ShannonClient(base_url="http://localhost:8080")

    for attempt in range(max_retries):
        try:
            for event in client.stream(workflow_id):
                print(f"Event: {event.type}")

                if event.type == 'WORKFLOW_COMPLETED':
                    # Get final status to retrieve result
                    status = client.get_status(workflow_id.replace('wf-', 'task-'))
                    return status.result

        except (ShannonError, ConnectionError) as e:
            print(f"Stream error (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise
```

## Authentication with Streaming

When authentication is enabled, provide API key:

### SSE with API Key

```bash theme={null}
curl -N \
  -H "X-API-Key: sk_test_123456" \
  http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
```

### Python SDK with API Key

```python theme={null}
client = ShannonClient(
    base_url="http://localhost:8080",
    api_key="sk_test_123456"
)

# API key automatically included in all requests
for event in client.stream(workflow_id):
    print(event)
```

## Performance Considerations

### Event Buffering

Shannon buffers events to prevent overwhelming clients:

```yaml theme={null}
# config/shannon.yaml
streaming:
  buffer_size: 100  # Maximum queued events
  flush_interval_ms: 100  # Send batches every 100ms
```

### Keepalive

SSE sends keepalive comments every 15 seconds to prevent timeout:

```
: keepalive
: keepalive
event: LLM_PROMPT
data: {"message":"..."}
```

### Resource Cleanup

Always close streams when done:

```python theme={null}
# Iterate through events (automatically handles connection)
for event in client.stream(workflow_id):
    if event.type == 'WORKFLOW_COMPLETED':
        break
# No explicit close needed
```

## Multi-Task Monitoring

Monitor multiple tasks simultaneously:

```python theme={null}
import asyncio

async def monitor_task(client, workflow_id):
    # Use sync stream() in async context
    for event in client.stream(workflow_id):
        print(f"[{workflow_id}] {event.type}")
        if event.type == 'WORKFLOW_COMPLETED':
            # Get final result from status
            task_id = workflow_id.replace('wf-', 'task-')
            status = client.get_status(task_id)
            return status.result

async def monitor_all(task_ids):
    client = AsyncShannonClient(base_url="http://localhost:8080")

    # Monitor all tasks in parallel
    results = await asyncio.gather(*[
        monitor_task(client, tid) for tid in task_ids
    ])

    return results

# Run
task_ids = ["task-1", "task-2", "task-3"]
results = asyncio.run(monitor_all(task_ids))
```

## Desktop Application

Shannon provides a native desktop application built with Tauri for real-time task monitoring and management. The desktop app offers:

* Live event streaming for active tasks
* Task submission and management interface
* Real-time metrics and visualizations
* Session history browsing

<Note>
  The desktop application has replaced the previous web dashboard. See the [Desktop App Guide](/en/quickstart/desktop-app) for installation and usage instructions.
</Note>

## Best Practices

### 1. Use SSE for Simple Monitoring

```python theme={null}
# ✅ Good: Simple SSE streaming
for event in client.stream(workflow_id):
    print(event.type)
```

### 2. Handle Disconnections

```python theme={null}
# ✅ Good: Retry logic
for attempt in range(3):
    try:
        for event in client.stream(workflow_id):
            process_event(event)
        break
    except (ShannonError, ConnectionError):
        if attempt == 2:
            raise
        time.sleep(2)
```

### 3. Filter Unnecessary Events

```python theme={null}
# ✅ Good: Only critical events
critical_events = ['PROGRESS', 'WORKFLOW_COMPLETED', 'ERROR_OCCURRED']
for event in client.stream(workflow_id):
    if event.type in critical_events:
        handle_event(event)
```

### 4. Set Timeouts

```python theme={null}
# ✅ Good: Timeout protection
import signal

def timeout_handler(signum, frame):
    raise TimeoutError("Stream timeout")

signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300)  # 5 minute timeout

try:
    for event in client.stream(workflow_id):
        print(event)
finally:
    signal.alarm(0)  # Cancel alarm
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Python SDK" icon="python" href="/en/sdk/python/streaming">
    Async streaming with SDK
  </Card>

  <Card title="API Reference" icon="code" href="/en/api/rest/streaming">
    Streaming API details
  </Card>

  <Card title="Monitoring" icon="chart-line" href="/en/quickstart/concepts/monitoring">
    Task monitoring guide
  </Card>

  <Card title="WebSocket API" icon="plug" href="/en/api/overview">
    WebSocket documentation
  </Card>
</CardGroup>
