> ## Documentation Index
> Fetch the complete documentation index at: https://docs.shannon.run/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming Endpoints

> Real-time event streaming via SSE and WebSocket

## Overview

Shannon provides real-time event streaming through Server-Sent Events (SSE) and WebSocket protocols. Use streaming to monitor task execution, display progress, and receive results as they're generated.

<Warning>
  **Authentication**: Streaming endpoints require the same headers as other APIs.
  Browsers cannot send custom headers with EventSource.

  * Development: set `GATEWAY_SKIP_AUTH=1`.
  * Production: proxy SSE via your backend and inject `X-API-Key` or Bearer headers.
    For SSE endpoints, the `api_key` query parameter is supported as a fallback (e.g., `?api_key=sk_...`). For all other endpoints, use headers exclusively.
</Warning>

<Note>
  **Streaming Limits**:

  * **Timeout**: Streams automatically close after 5 minutes of inactivity
  * **Buffer Size**: Maximum 1MB buffered data per connection
  * **Usage Metadata**: Token counts and costs now available for all LLM providers (OpenAI, Anthropic, Google, Groq, xAI)
</Note>

## Endpoints

| Method | Endpoint                    | Protocol  | Description                                  |
| ------ | --------------------------- | --------- | -------------------------------------------- |
| `POST` | `/api/v1/tasks/stream`      | HTTP+SSE  | Submit task and get stream URL (recommended) |
| `GET`  | `/api/v1/stream/sse`        | SSE       | Server-Sent Events endpoint                  |
| `GET`  | `/api/v1/stream/ws`         | WebSocket | WebSocket streaming endpoint                 |
| `GET`  | `/api/v1/tasks/{id}/events` | HTTP      | Get historical events (paginated)            |

## Unified Submit + Stream (Recommended)

### POST /api/v1/tasks/stream

The easiest way to submit a task and immediately start streaming its events. This endpoint combines task submission with streaming setup in one call.

<Tip>
  **Best for frontend applications**: This endpoint is perfect for real-time UIs where you want to show progress immediately after submitting a task.
</Tip>

#### Authentication

**Required**: Yes

```
X-API-Key: sk_test_123456
```

Or:

```
Authorization: Bearer YOUR_TOKEN
```

#### Request Body

| Parameter           | Type   | Required | Description                                            |
| ------------------- | ------ | -------- | ------------------------------------------------------ |
| `query`             | string | Yes      | Natural language task description                      |
| `session_id`        | string | No       | Session identifier for multi-turn conversations        |
| `context`           | object | No       | Additional context data as key-value pairs             |
| `model_tier`        | string | No       | Preferred tier: `small`, `medium`, or `large`          |
| `model_override`    | string | No       | Specific model name (canonical; e.g., `gpt-5`)         |
| `provider_override` | string | No       | Force provider (e.g., `openai`, `anthropic`, `google`) |

#### Response

**Status**: `201 Created`

**Body**:

```json theme={"dark"}
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "workflow_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "stream_url": "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}
```

#### Response Fields

| Field         | Type   | Description                         |
| ------------- | ------ | ----------------------------------- |
| `task_id`     | string | Unique task identifier              |
| `workflow_id` | string | Task/workflow identifier            |
| `stream_url`  | string | Relative URL to SSE stream endpoint |

#### Example: JavaScript/TypeScript

```javascript theme={"dark"}
async function submitAndStream(query, onEvent, onComplete, onError) {
  try {
    // 1. Submit task and get stream URL
    const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer YOUR_TOKEN'
      },
      body: JSON.stringify({
        query: query,
        session_id: `session-${Date.now()}`
      })
    });

    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }

    const { task_id, workflow_id, stream_url } = await response.json();
    console.log(`Task submitted: ${workflow_id}`);

    // 2. Connect to SSE stream
    // Note: Browser EventSource does NOT support custom headers.
    // Use api_key query param, or proxy SSE via your backend.
    const eventSource = new EventSource(
      `http://localhost:8080${stream_url}&api_key=${localStorage.getItem('token')}`
    );

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      onEvent(event);

      // Check for completion
      if (event.type === 'WORKFLOW_COMPLETED') {
        eventSource.close();
        onComplete(event);
      }
    };

    eventSource.onerror = (err) => {
      console.error('SSE error:', err);
      eventSource.close();
      onError(err);
    };

    return { workflow_id, eventSource };

  } catch (error) {
    onError(error);
    throw error;
  }
}

// Usage
submitAndStream(
  "Analyze Q4 revenue trends",
  (event) => console.log(`[${event.type}]`, event.message),
  (final) => console.log("Completed:", final.result),
  (error) => console.error("Error:", error)
);
```

#### Example: React Hook

```jsx theme={"dark"}
import { useState, useCallback, useRef } from 'react';

function useTaskStream(apiUrl = 'http://localhost:8080') {
  const [events, setEvents] = useState([]);
  const [isStreaming, setIsStreaming] = useState(false);
  const [error, setError] = useState(null);
  const [workflowId, setWorkflowId] = useState(null);
  const eventSourceRef = useRef(null);

  const submitTask = useCallback(async (query, sessionId = null) => {
    setEvents([]);
    setError(null);
    setIsStreaming(true);

    try {
      // Submit task
      const response = await fetch(`${apiUrl}/api/v1/tasks/stream`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${localStorage.getItem('token')}`
        },
        body: JSON.stringify({
          query,
          session_id: sessionId || `session-${Date.now()}`
        })
      });

      if (!response.ok) throw new Error(`HTTP ${response.status}`);

      const { task_id, workflow_id, stream_url } = await response.json();
      setWorkflowId(workflow_id);

      // Connect to stream
      // Note: Browser EventSource does NOT support custom headers.
      // Use api_key query param, or proxy SSE via your backend.
      const token = localStorage.getItem('token');
      const sseUrl = token
        ? `${apiUrl}${stream_url}&api_key=${token}`
        : `${apiUrl}${stream_url}`;
      const eventSource = new EventSource(sseUrl);
      eventSourceRef.current = eventSource;

      eventSource.onmessage = (e) => {
        const event = JSON.parse(e.data);
        setEvents(prev => [...prev, event]);

      if (event.type === 'WORKFLOW_COMPLETED' || event.type === 'ERROR_OCCURRED') {
        eventSource.close();
        setIsStreaming(false);
      }
      };

      eventSource.onerror = (err) => {
        setError(err);
        setIsStreaming(false);
        eventSource.close();
      };

    } catch (err) {
      setError(err);
      setIsStreaming(false);
    }
  }, [apiUrl]);

  const stopStreaming = useCallback(() => {
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      setIsStreaming(false);
    }
  }, []);

  return { submitTask, stopStreaming, events, isStreaming, error, workflowId };
}

// Usage in component
function TaskStreamDemo() {
  const { submitTask, stopStreaming, events, isStreaming, error, workflowId } = useTaskStream();

  return (
    <div>
      <button
        onClick={() => submitTask("What is 15 + 25?")}
        disabled={isStreaming}
      >
        {isStreaming ? 'Processing...' : 'Submit Task'}
      </button>

      {workflowId && <p>Workflow ID: {workflowId}</p>}
      {error && <p style={{color: 'red'}}>Error: {error.message}</p>}

      <div>
        {events.map((event, idx) => (
          <div key={idx}>
            <strong>{event.type}</strong>: {event.message || event.agent_id}
          </div>
        ))}
      </div>

      {isStreaming && <button onClick={stopStreaming}>Stop</button>}
    </div>
  );
}
```

#### Example: Vue 3 Composition API

```vue theme={"dark"}
<script setup>
import { ref } from 'vue';

const events = ref([]);
const isStreaming = ref(false);
const error = ref(null);
const workflowId = ref(null);
let eventSource = null;

async function submitTask(query) {
  events.value = [];
  error.value = null;
  isStreaming.value = true;

  try {
    const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${localStorage.getItem('token')}`
      },
      body: JSON.stringify({
        query,
        session_id: `session-${Date.now()}`
      })
    });

    if (!response.ok) throw new Error(`HTTP ${response.status}`);

    const { task_id, workflow_id, stream_url } = await response.json();
    workflowId.value = workflow_id;

    // Browser EventSource does NOT support custom headers — use api_key query param
    eventSource = new EventSource(`http://localhost:8080${stream_url}&api_key=${localStorage.getItem('token')}`);

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      events.value.push(event);

      if (event.type === 'WORKFLOW_COMPLETED') {
        eventSource.close();
        isStreaming.value = false;
      }
    };

    eventSource.onerror = (err) => {
      error.value = err;
      eventSource.close();
      isStreaming.value = false;
    };
  } catch (err) {
    error.value = err;
    isStreaming.value = false;
  }
}

function stopStreaming() {
  if (eventSource) {
    eventSource.close();
    isStreaming.value = false;
  }
}
</script>

<template>
  <div>
    <button @click="submitTask('Analyze data')" :disabled="isStreaming">
      {{ isStreaming ? 'Processing...' : 'Submit Task' }}
    </button>

    <div v-if="workflowId">Workflow: {{ workflowId }}</div>
    <div v-if="error" style="color: red">Error: {{ error.message }}</div>

    <div v-for="(event, idx) in events" :key="idx">
      <strong>{{ event.type }}</strong>: {{ event.message || event.agent_id }}
    </div>

    <button v-if="isStreaming" @click="stopStreaming">Stop</button>
  </div>
</template>
```

#### Example: Python

```python theme={"dark"}
import httpx
import json

def submit_and_stream(query: str, api_key: str):
    """Submit task and stream events."""

    # 1. Submit task
    response = httpx.post(
        "http://localhost:8080/api/v1/tasks/stream",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={
            "query": query,
            "session_id": f"session-{int(time.time())}"
        }
    )

    data = response.json()
    workflow_id = data["workflow_id"]
    stream_url = data["stream_url"]

    print(f"Task submitted: {workflow_id}")

    # 2. Stream events
    with httpx.stream(
        "GET",
        f"http://localhost:8080{stream_url}",
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=None
    ) as stream_response:
        for line in stream_response.iter_lines():
            if line.startswith("data:"):
                event = json.loads(line[5:])
                print(f"[{event['type']}] {event.get('message', '')}")

                if event['type'] in ['WORKFLOW_COMPLETED']:
                    return event

# Usage
result = submit_and_stream("What is the capital of France?", "sk_test_123456")
print("Final result:", result.get('result'))
```

<Note>
  **Why use this endpoint?** The unified endpoint ensures you start streaming immediately after submission, preventing any missed events that could occur if you submit and then connect separately.
</Note>

## Server-Sent Events (SSE)

### GET /api/v1/stream/sse

Real-time event streaming using Server-Sent Events.

#### Authentication

**Required**: Yes

```
X-API-Key: sk_test_123456
```

#### Query Parameters

| Parameter       | Type   | Required | Description                                                                                                                                                                             |
| --------------- | ------ | -------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `workflow_id`   | string | Yes      | Task/workflow identifier                                                                                                                                                                |
| `types`         | string | No       | Comma-separated event types to filter                                                                                                                                                   |
| `last_event_id` | string | No       | Resume from specific event ID. Accepts a Redis stream ID (e.g., `1700000000000-0`) or a numeric sequence (e.g., `42`). When numeric, replay includes events with `seq > last_event_id`. |

#### Event Format

Each event follows SSE specification:

```
id: <event_id>
event: <event_type>
data: <json_payload>

```

#### Example Request

```bash theme={"dark"}
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \
  -H "X-API-Key: sk_test_123456"
```

#### Example Response

```
id: 1719000000000-0
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","type":"WORKFLOW_STARTED","agent_id":"orchestrator","message":"Starting up","timestamp":"2025-10-22T10:30:00Z","seq":1,"stream_id":"1719000000000-0"}

id: 1719000001000-0
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","type":"AGENT_THINKING","agent_id":"Ryogoku","message":"Analyzing query...","timestamp":"2025-10-22T10:30:01Z","seq":2,"stream_id":"1719000001000-0"}

id: 1719000002000-0
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","type":"TOOL_INVOKED","agent_id":"Ryogoku","message":"Calling web_search","payload":{"tool":"web_search","params":{"query":"Python programming"}},"timestamp":"2025-10-22T10:30:02Z","seq":3,"stream_id":"1719000002000-0"}

id: 1719000005000-0
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","type":"TOOL_OBSERVATION","agent_id":"Ryogoku","message":"web_search completed","payload":{"success":true,"duration_ms":2800},"timestamp":"2025-10-22T10:30:05Z","seq":4,"stream_id":"1719000005000-0"}

id: 1719000006000-0
event: thread.message.completed
data: {"workflow_id":"task_abc123","agent_id":"simple-agent","response":"Python is a high-level programming language...","metadata":{"model_used":"claude-haiku-4-5-20251001","tokens_used":850,"cost_usd":0.003},"seq":5,"stream_id":"1719000006000-0"}

id: 1719000007000-0
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","type":"WORKFLOW_COMPLETED","message":"Workflow completed","timestamp":"2025-10-22T10:30:07Z","seq":6,"stream_id":"1719000007000-0"}
```

<Warning>
  **Event ID format**: The `id` field uses Redis stream IDs (e.g., `1719000000000-0`), NOT simple integers. You must use these exact IDs when reconnecting with `last_event_id`. See [Reconnection](#reconnection) below.
</Warning>

## WebSocket

### GET /api/v1/stream/ws

Bidirectional streaming via WebSocket.

#### Authentication

The gateway authenticates WebSocket connections via headers only (`X-API-Key` or `Authorization`). Browsers cannot set custom headers during the WebSocket handshake. For browser usage:

* Run locally with `GATEWAY_SKIP_AUTH=1`, or
* Use a reverse proxy that injects the header before forwarding to the gateway.

Header-based examples for server environments:

Node (ws):

```js theme={"dark"}
import WebSocket from 'ws';

const ws = new WebSocket('ws://localhost:8080/api/v1/stream/ws', {
  headers: { 'X-API-Key': 'sk_test_123456' },
});

ws.on('open', () => {
  ws.send(JSON.stringify({ type: 'subscribe', workflow_id: 'task_abc123' }));
});

ws.on('message', (msg) => {
  const data = JSON.parse(msg.toString());
  console.log('Event:', data.type, data.message);
});

ws.on('error', (err) => console.error('WebSocket error:', err));
```

Python (websockets):

```python theme={"dark"}
import asyncio, json, websockets

async def main():
    async with websockets.connect(
        'ws://localhost:8080/api/v1/stream/ws',
        extra_headers={'X-API-Key': 'sk_test_123456'},
    ) as ws:
        await ws.send(json.dumps({'type': 'subscribe', 'workflow_id': 'task_abc123'}))
        async for message in ws:
            data = json.loads(message)
            print('Event:', data.get('type'), data.get('message'))

asyncio.run(main())
```

<Note>
  Passing the API key in the query string or via an "auth" message after connect is not supported by the gateway.
</Note>

#### Message Types

**Client → Server**:

```json theme={"dark"}
// Subscribe to workflow
{
  "type": "subscribe",
  "workflow_id": "task_abc123",
  "types": ["AGENT_THINKING", "TOOL_INVOKED"]
}

// Unsubscribe
{
  "type": "unsubscribe",
  "workflow_id": "task_abc123"
}

// Ping (keep-alive)
{
  "type": "ping"
}
```

**Server → Client**:

```json theme={"dark"}
// Event
{
  "type": "AGENT_THINKING",
  "workflow_id": "task_abc123",
  "message": "Analyzing query...",
  "timestamp": "2025-10-22T10:30:00Z"
}

// Pong (keep-alive response)
{
  "type": "pong"
}
```

## OpenAI-Compatible Streaming

Shannon also provides an OpenAI-compatible streaming endpoint at `/v1/chat/completions` that translates Shannon events into the standard OpenAI `chat.completion.chunk` format. This allows you to use OpenAI SDKs directly with Shannon.

For complete documentation on the OpenAI-compatible API, including request/response schemas, available models, Shannon-specific extensions (`shannon_events`), and SDK usage examples, see the [OpenAI-Compatible API Reference](/en/api/rest/openai-compatible).

## Event Types

### Core Events

| Event Type           | Description                    | When Fired            |
| -------------------- | ------------------------------ | --------------------- |
| `WORKFLOW_STARTED`   | Workflow execution began       | Start of task         |
| `WORKFLOW_COMPLETED` | Workflow finished successfully | End of task (success) |

### Agent Events

| Event Type        | Description              | Payload Fields        |
| ----------------- | ------------------------ | --------------------- |
| `AGENT_THINKING`  | Agent reasoning/planning | `agent_id`, `message` |
| `AGENT_COMPLETED` | Agent finished execution | `agent_id`, `result`  |

### Tool Events

| Event Type         | Description                | Payload Fields   |
| ------------------ | -------------------------- | ---------------- |
| `TOOL_INVOKED`     | Tool execution started     | `tool`, `params` |
| `TOOL_OBSERVATION` | Agent observes tool result | `tool`, `result` |

### LLM Events

| Event Type                 | Description               | Payload Fields                                                                 |
| -------------------------- | ------------------------- | ------------------------------------------------------------------------------ |
| `LLM_PROMPT`               | Prompt sent to LLM        | `message`, `payload.model`, `payload.provider`                                 |
| `thread.message.delta`     | Streaming text chunk      | `delta`, `agent_id`                                                            |
| `thread.message.completed` | Final response with usage | `response`, `metadata.model_used`, `metadata.tokens_used`, `metadata.cost_usd` |
| `LLM_PARTIAL`              | Raw LLM streaming token   | `text`                                                                         |
| `LLM_OUTPUT`               | Raw LLM final output      | `text`                                                                         |

<Tip>
  **For most integrations**, listen to `thread.message.delta` (streaming text) and `thread.message.completed` (final result with usage metadata) rather than `LLM_PARTIAL`/`LLM_OUTPUT`.
</Tip>

### Progress & System Events

| Event Type          | Description                      | Payload Fields                                                                               |
| ------------------- | -------------------------------- | -------------------------------------------------------------------------------------------- |
| `PROGRESS`          | Progress update                  | `progress`, `message`                                                                        |
| `DATA_PROCESSING`   | Data processing in progress      | `message`                                                                                    |
| `WAITING`           | Waiting for resources            | `message`                                                                                    |
| `ERROR_OCCURRED`    | Error occurred                   | `error`, `severity`                                                                          |
| `ERROR_RECOVERY`    | Error recovery attempt           | `message`                                                                                    |
| `WORKSPACE_UPDATED` | Memory/context updated           | `message`                                                                                    |
| `BUDGET_THRESHOLD`  | Budget warning threshold reached | `usage_percent`, `threshold_percent`, `tokens_used`, `tokens_budget`, `level`, `budget_type` |

### Stream Lifecycle Events

| Event Type   | Description                                                                      |
| ------------ | -------------------------------------------------------------------------------- |
| `STREAM_END` | Explicit end-of-stream signal (no more events will be emitted for this workflow) |

<Note>
  Over SSE, the `STREAM_END` lifecycle event is delivered as an SSE event named `done` with `data: [DONE]` (plain text, not JSON). Over WebSocket, it appears as a normal JSON event with `"type": "STREAM_END"`.
</Note>

### Team & Approvals

| Event Type             | Description                |
| ---------------------- | -------------------------- |
| `TEAM_RECRUITED`       | Multi-agent team assembled |
| `TEAM_RETIRED`         | Team disbanded             |
| `TEAM_STATUS`          | Team coordination update   |
| `ROLE_ASSIGNED`        | Agent role assigned        |
| `DELEGATION`           | Task delegated             |
| `DEPENDENCY_SATISFIED` | Dependency resolved        |
| `APPROVAL_REQUESTED`   | Human approval needed      |
| `APPROVAL_DECISION`    | Approval decision recorded |

## Code Examples

### Python with httpx (SSE)

```python theme={"dark"}
import httpx
import json

def stream_task_events(task_id: str, api_key: str):
    """Stream task events using SSE."""
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"

    with httpx.stream(
        "GET",
        url,
        headers={"X-API-Key": api_key},
        timeout=None  # No timeout for streaming
    ) as response:
        for line in response.iter_lines():
            if line.startswith("data:"):
                data = json.loads(line[5:])  # Remove "data:" prefix
                yield data

# Usage
for event in stream_task_events("task_abc123", "sk_test_123456"):
    print(f"[{event.get('type')}] {event.get('message', '')}")

    if event.get('type') == 'WORKFLOW_COMPLETED':
        print("Final result:", event.get('result'))
        break
```

### Python - Stream with Event Filtering

```python theme={"dark"}
def stream_filtered_events(task_id: str, api_key: str, event_types: list):
    """Stream only specific event types."""
    types_param = ",".join(event_types)
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}&types={types_param}"

    with httpx.stream("GET", url, headers={"X-API-Key": api_key}) as response:
        for line in response.iter_lines():
            if line.startswith("data:"):
                yield json.loads(line[5:])

# Only receive agent thinking and tool events
for event in stream_filtered_events(
    "task_abc123",
    "sk_test_123456",
    ["AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"]
):
    print(f"{event['type']}: {event.get('message', event.get('tool'))}")
```

### JavaScript/Node.js (SSE)

```javascript theme={"dark"}
const EventSource = require('eventsource');

function streamTaskEvents(taskId, apiKey) {
  const url = `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`;

  const eventSource = new EventSource(url, {
    headers: {
      'X-API-Key': apiKey
    }
  });

  eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log(`[${data.type}] ${data.message || ''}`);

    if (data.type === 'WORKFLOW_COMPLETED') {
      console.log('Final result:', data.result);
      eventSource.close();
    }
  };

  eventSource.onerror = (error) => {
    console.error('SSE error:', error);
    eventSource.close();
  };

  return eventSource;
}

// Usage
const stream = streamTaskEvents('task_abc123', 'sk_test_123456');

// Close manually
setTimeout(() => stream.close(), 60000);
```

### JavaScript/Node.js - WebSocket (ws)

```javascript theme={"dark"}
import WebSocket from 'ws';

function connectWebSocket(taskId, apiKey) {
  const ws = new WebSocket(
    `ws://localhost:8080/api/v1/stream/ws?workflow_id=${taskId}`,
    { headers: { 'X-API-Key': apiKey } }
  );

  ws.on('open', () => console.log('✓ Connected'));

  ws.on('message', (msg) => {
    const data = JSON.parse(msg.toString());
    switch (data.type) {
      case 'AGENT_THINKING':
        console.log(`💭 ${data.message}`);
        break;
      case 'TOOL_INVOKED':
        console.log(`🔧 Tool: ${data.tool}`);
        break;
      case 'TOOL_OBSERVATION':
        console.log(`✓ Result: ${data.result}`);
        break;
      case 'WORKFLOW_COMPLETED':
        console.log(`✓ Done: ${data.result}`);
        ws.close();
        break;
      default:
        console.log(`[${data.type}] ${data.message || ''}`);
    }
  });

  ws.on('error', (err) => console.error('❌ Error:', err));
  ws.on('close', () => console.log('Connection closed'));

  return ws;
}

const ws = connectWebSocket('task_abc123', 'sk_test_123456');
```

### Go (SSE)

```go theme={"dark"}
package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
)

type Event struct {
    Type      string                 `json:"type"`
    WorkflowID string                `json:"workflow_id"`
    Message   string                 `json:"message,omitempty"`
    Data      map[string]interface{} `json:",inline"`
}

func streamEvents(taskID, apiKey string) error {
    url := fmt.Sprintf(
        "http://localhost:8080/api/v1/stream/sse?workflow_id=%s",
        taskID,
    )

    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("X-API-Key", apiKey)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    scanner := bufio.NewScanner(resp.Body)

    for scanner.Scan() {
        line := scanner.Text()

        if strings.HasPrefix(line, "data:") {
            data := strings.TrimPrefix(line, "data:")

            var event Event
            json.Unmarshal([]byte(data), &event)

            fmt.Printf("[%s] %s\n", event.Type, event.Message)

            if event.Type == "WORKFLOW_COMPLETED" {
                fmt.Println("✓ Task completed")
                break
            }
        }
    }

    return scanner.Err()
}

func main() {
    err := streamEvents("task_abc123", "sk_test_123456")
    if err != nil {
        fmt.Println("Error:", err)
    }
}
```

### Bash/curl (SSE)

```bash theme={"dark"}
#!/bin/bash

API_KEY="sk_test_123456"
TASK_ID="$1"

curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=$TASK_ID" \
  -H "X-API-Key: $API_KEY" \
  | while IFS= read -r line; do
    if [[ $line == data:* ]]; then
      # Extract JSON from "data: {...}"
      JSON="${line#data:}"

      # Parse and display
      TYPE=$(echo "$JSON" | jq -r '.type')
      MESSAGE=$(echo "$JSON" | jq -r '.message // ""')

      echo "[$(date +%T)] $TYPE: $MESSAGE"

      # Exit on completion
      if [[ "$TYPE" == "WORKFLOW_COMPLETED" ]]; then
        echo ""
        echo "$JSON" | jq -r '.result'
        break
      fi
    fi
  done
```

## Use Cases

### 1. Real-Time Progress Display

```python theme={"dark"}
def display_progress(task_id: str, api_key: str):
    """Display real-time progress to user."""
    print(f"Task {task_id} starting...")

    for event in stream_task_events(task_id, api_key):
        event_type = event.get('type')

        if event_type == 'AGENT_THINKING':
            print(f"💭 {event['message']}")
        elif event_type == 'TOOL_INVOKED':
            print(f"🔧 Using tool: {event['tool']}")
        elif event_type == 'TOOL_OBSERVATION':
            print(f"✓ Tool completed")
        elif event_type == 'LLM_PARTIAL':
            print(event['text'], end='', flush=True)
        elif event_type == 'WORKFLOW_COMPLETED':
            print(f"\n\n✓ Done!")
            return event['result']
```

### 2. Log All Events to File

```python theme={"dark"}
import json
from datetime import datetime

def log_events_to_file(task_id: str, api_key: str, log_file: str):
    """Log all events to JSON Lines file."""
    with open(log_file, 'a') as f:
        for event in stream_task_events(task_id, api_key):
            # Add timestamp
            event['logged_at'] = datetime.now().isoformat()

            # Write as JSON Lines
            f.write(json.dumps(event) + '\n')
            f.flush()

            if event.get('type') == 'WORKFLOW_COMPLETED':
                break

log_events_to_file("task_abc123", "sk_test_123456", "task_events.jsonl")
```

### 3. Collect Tool Usage Metrics

```python theme={"dark"}
def collect_tool_metrics(task_id: str, api_key: str):
    """Collect metrics about tool usage."""
    metrics = {
        "tools_invoked": 0,
        "tools_succeeded": 0,
        "tools_failed": 0,
        "tool_list": []
    }

    for event in stream_task_events(task_id, api_key):
        if event.get('type') == 'TOOL_INVOKED':
            metrics['tools_invoked'] += 1
            metrics['tool_list'].append(event['tool'])
        elif event.get('type') == 'TOOL_OBSERVATION':
            metrics['tools_succeeded'] += 1
        elif event.get('type') == 'ERROR_OCCURRED':
            metrics['tools_failed'] += 1
        elif event.get('type') == 'WORKFLOW_COMPLETED':
            break

    return metrics

metrics = collect_tool_metrics("task_abc123", "sk_test_123456")
print(f"Tools used: {metrics['tool_list']}")
print(f"Success rate: {metrics['tools_succeeded']}/{metrics['tools_invoked']}")
```

### 4. React UI Integration

```javascript theme={"dark"}
import { useState, useEffect } from 'react';

function TaskMonitor({ taskId, apiKey }) {
  const [events, setEvents] = useState([]);
  const [status, setStatus] = useState('connecting');

  useEffect(() => {
    // Browser EventSource does NOT support custom headers.
    // Use api_key query param, or proxy SSE via your backend.
    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}&api_key=${apiKey}`
    );

    eventSource.onopen = () => setStatus('connected');

    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setEvents(prev => [...prev, data]);

      if (data.type === 'WORKFLOW_COMPLETED') {
        setStatus('completed');
        eventSource.close();
      }
    };

    eventSource.onerror = () => {
      setStatus('error');
      eventSource.close();
    };

    return () => eventSource.close();
  }, [taskId, apiKey]);

  return (
    <div>
      <h3>Status: {status}</h3>
      <ul>
        {events.map((event, i) => (
          <li key={i}>
            <strong>{event.type}</strong>: {event.message || event.tool}
          </li>
        ))}
      </ul>
    </div>
  );
}
```

## Deep Research Streaming

Deep Research tasks take 2-10 minutes. Use `context.force_research` to trigger Deep Research through the Task API:

```bash theme={"dark"}
curl -X POST http://localhost:8080/api/v1/tasks/stream \
  -H "Content-Type: application/json" \
  -H "X-API-Key: sk_test_123456" \
  -d '{
    "query": "What are the latest developments in quantum computing?",
    "session_id": "session-123",
    "context": {"force_research": true}
  }'
```

Control research depth with `context.research_strategy`:

| Strategy             | Depth   | Typical Duration | Use Case                         |
| -------------------- | ------- | ---------------- | -------------------------------- |
| `quick`              | Low     | 30s-2min         | Simple factual queries           |
| `standard` (default) | Medium  | 2-5min           | General research                 |
| `deep`               | High    | 5-10min          | Complex multi-faceted topics     |
| `academic`           | Highest | 5-10min          | Citation-heavy academic research |

<Warning>
  **Chat API vs Task API for Deep Research**: The Chat API (`/v1/chat/completions` with `model: "shannon-deep-research"`) can also trigger Deep Research, but its streaming format does **not** include SSE event IDs — so reconnection is impossible. If your platform has connection time limits (e.g., Vercel 5-min limit) or you need to survive page refreshes, **use the Task API**.
</Warning>

## Heartbeat

The server sends a `: ping` SSE comment every 10 seconds to keep connections alive through proxies and load balancers:

```
: ping
```

This is an SSE comment (not a JSON message). If you stop receiving pings, the connection is dead — reconnect immediately.

## Reconnection

SSE connections may drop due to network issues, proxy timeouts, or platform limits (e.g., Vercel hobby plan: 5-minute connection limit). Shannon supports resuming from where you left off.

**How it works:**

1. Track the `id` field of each received SSE event
2. When disconnected, reconnect with `last_event_id` parameter
3. The server replays all events after that ID (buffered \~256 events, 24h TTL)

```bash theme={"dark"}
# Reconnect from where you left off
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123&last_event_id=1719000002000-0" \
  -H "X-API-Key: sk_test_123456"
```

### Proactive Reconnection (Recommended)

For platforms with connection time limits, proactively disconnect before the limit:

```
1. Connect to SSE
2. Start a 4-minute timer (safety margin before 5-min limit)
3. When timer fires:
   a. Record last received event ID
   b. Close connection
   c. Immediately reconnect with last_event_id=<last_id>
4. Repeat until you receive WORKFLOW_COMPLETED or done event
```

### Page Refresh / Fallback

If the user refreshes the page and you still have the `workflow_id`:

1. Check task status: `GET /api/v1/tasks/{workflow_id}`
2. If `TASK_STATUS_RUNNING` → reconnect to SSE
3. If `TASK_STATUS_COMPLETED` → display the result from the response
4. If `TASK_STATUS_FAILED` → display the error

### Python with Reconnection

```python theme={"dark"}
import time

def stream_with_reconnect(task_id: str, api_key: str, max_retries: int = 5):
    """Stream with automatic reconnection using last_event_id."""
    last_id = None

    for attempt in range(max_retries):
        try:
            url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"
            if last_id:
                url += f"&last_event_id={last_id}"

            with httpx.stream("GET", url, headers={"X-API-Key": api_key}, timeout=None) as response:
                for line in response.iter_lines():
                    if line.startswith("id:"):
                        last_id = line[3:].strip()  # Track last event ID
                    elif line.startswith("data:"):
                        event = json.loads(line[5:])
                        yield event
                        if event.get('type') in ('WORKFLOW_COMPLETED', 'STREAM_END'):
                            return
        except Exception as e:
            if attempt < max_retries - 1:
                wait = min(2 ** attempt, 10)
                print(f"Connection lost, reconnecting in {wait}s (last_id={last_id})...")
                time.sleep(wait)
            else:
                raise
```

## Best Practices

### 2. Implement Timeout

```python theme={"dark"}
import signal

def stream_with_timeout(task_id: str, api_key: str, timeout: int = 300):
    """Stream with timeout."""
    def timeout_handler(signum, frame):
        raise TimeoutError("Stream timeout")

    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout)

    try:
        for event in stream_task_events(task_id, api_key):
            yield event
            if event.get('type') == 'WORKFLOW_COMPLETED':
                break
    finally:
        signal.alarm(0)  # Cancel alarm
```

### 3. Filter Events Client-Side

```python theme={"dark"}
def stream_specific_events(task_id: str, api_key: str, event_types: set):
    """Filter events client-side."""
    for event in stream_task_events(task_id, api_key):
        if event.get('type') in event_types:
            yield event

        if event.get('type') == 'WORKFLOW_COMPLETED':
            yield event
            break

# Only receive agent and tool events
for event in stream_specific_events(
    "task_abc123",
    "sk_test_123456",
    {"AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"}
):
    print(event)
```

### 4. Resume from Last Event

```python theme={"dark"}
def stream_with_resume(task_id: str, api_key: str, last_event_id: str = None):
    """Resume streaming from specific event."""
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"

    if last_event_id:
        url += f"&last_event_id={last_event_id}"

    # Stream events...
```

Note: `last_event_id` accepts either a Redis stream ID (e.g., `1700000000000-0`) or a numeric sequence (e.g., `42`). When numeric, replay includes events with `seq > last_event_id`.

## Comparison: SSE vs WebSocket vs Polling

| Feature             | SSE               | WebSocket        | Polling         |
| ------------------- | ----------------- | ---------------- | --------------- |
| **Direction**       | Server → Client   | Bidirectional    | Client → Server |
| **Protocol**        | HTTP              | WebSocket        | HTTP            |
| **Auto-reconnect**  | Yes (browser)     | No (manual)      | N/A             |
| **Overhead**        | Low               | Very Low         | High            |
| **Simplicity**      | High              | Medium           | High            |
| **Use Case**        | Real-time updates | Interactive apps | Simple status   |
| **Shannon Support** | ✅ Recommended     | ✅ Available      | ⚠️ Not ideal    |

### When to Use Each

* **SSE**: Most use cases, real-time monitoring, progress display
* **WebSocket**: Interactive applications, bidirectional communication needed
* **Polling** (GET /api/v1/tasks/{id}): Legacy systems, no streaming support

## Related Endpoints

<CardGroup cols={2}>
  <Card title="Submit Task" icon="paper-plane" href="/en/api/rest/submit-task">
    POST /api/v1/tasks
  </Card>

  <Card title="Get Status" icon="circle-info" href="/en/api/rest/get-status">
    GET /api/v1/tasks/{id}
  </Card>

  <Card title="Python SDK" icon="python" href="/en/sdk/python/quickstart">
    Use client.stream()
  </Card>
</CardGroup>

## Notes

<Tip>
  **Event Retention**:

  * **Redis**: All events stored for 24 hours (real-time streaming)
  * **PostgreSQL**: Critical events stored for 90 days (historical queries)
  * Use `last_event_id` to resume streaming if connection drops
</Tip>

<Warning>
  **Connection Limits**:

  * Maximum 100 concurrent streaming connections per API key
  * 5-minute inactivity timeout (automatic connection close)
  * 1MB buffer size limit per connection
  * Consider multiplexing multiple workflows over a single WebSocket
</Warning>
