跳转到主要内容

概述

Shannon 通过 Server-Sent Events (SSE) 和 WebSocket 协议提供实时事件流传输。使用流式传输来监控任务执行、显示进度并在生成时接收结果。
认证:流式端点与其他 API 使用相同的认证请求头。 浏览器 EventSource 无法携带自定义请求头。
  • 开发环境:设置 GATEWAY_SKIP_AUTH=1
  • 生产环境:通过后端代理转发 SSE,并注入 X-API-Key 或 Bearer 头。 不要通过 URL 查询参数传递 API 密钥。
流式传输限制
  • 超时:流在 5 分钟无活动后自动关闭
  • 缓冲区大小:每个连接最大 1MB 缓冲数据
  • 使用元数据:现在所有 LLM 提供商(OpenAI、Anthropic、Google、Groq、xAI)都可获取 token 计数和成本

端点

方法端点协议描述
POST/api/v1/tasks/streamHTTP+SSE提交任务并获取流 URL(推荐)
GET/api/v1/stream/sseSSEServer-Sent Events 端点
GET/api/v1/stream/wsWebSocketWebSocket 流式传输端点
GET/api/v1/tasks/{id}/eventsHTTP获取历史事件(分页)

统一提交 + 流式传输(推荐)

POST /api/v1/tasks/stream

提交任务并立即开始流式传输其事件的最简单方法。此端点在一次调用中结合了任务提交和流式设置。
最适合前端应用:此端点非常适合需要在提交任务后立即显示进度的实时 UI。

身份验证

必需:是
X-API-Key: sk_test_123456
或:
Authorization: Bearer YOUR_TOKEN

请求体

参数类型必需描述
querystring自然语言任务描述
session_idstring多轮对话的会话标识符
contextobject键值对形式的附加上下文数据
model_tierstring模型层级:smallmediumlarge
model_overridestring指定模型名称(规范 ID;例如 gpt-5
provider_overridestring强制指定提供商(如 openaianthropicgoogle

响应

状态201 Created 响应体
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "workflow_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "stream_url": "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}

响应字段

字段类型描述
workflow_idstring任务/工作流标识符
stream_urlstringSSE 流端点的相对 URL

示例:JavaScript/TypeScript

async function submitAndStream(query, onEvent, onComplete, onError) {
  try {
    // 1. 提交任务并获取流 URL
    const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer YOUR_TOKEN'
      },
      body: JSON.stringify({
        query: query,
        session_id: `session-${Date.now()}`
      })
    });

    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }

    const { task_id, workflow_id, stream_url } = await response.json();
    console.log(`任务已提交: ${workflow_id}`);

    // 2. 连接到 SSE 流
    const eventSource = new EventSource(
      `http://localhost:8080${stream_url}`,
      { withCredentials: false }
    );

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      onEvent(event);

      // 检查完成状态
      if (event.type === 'WORKFLOW_COMPLETED') {
        eventSource.close();
        onComplete(event);
      }
    };

    eventSource.onerror = (err) => {
      console.error('SSE 错误:', err);
      eventSource.close();
      onError(err);
    };

    return { workflow_id, eventSource };

  } catch (error) {
    onError(error);
    throw error;
  }
}

// 使用示例
submitAndStream(
  "分析第四季度收入趋势",
  (event) => console.log(`[${event.type}]`, event.message),
  (final) => console.log("已完成:", final.result),
  (error) => console.error("错误:", error)
);

示例:React Hook

import { useState, useCallback, useRef } from 'react';

function useTaskStream(apiUrl = 'http://localhost:8080') {
  const [events, setEvents] = useState([]);
  const [isStreaming, setIsStreaming] = useState(false);
  const [error, setError] = useState(null);
  const [workflowId, setWorkflowId] = useState(null);
  const eventSourceRef = useRef(null);

  const submitTask = useCallback(async (query, sessionId = null) => {
    setEvents([]);
    setError(null);
    setIsStreaming(true);

    try {
      // 提交任务
      const response = await fetch(`${apiUrl}/api/v1/tasks/stream`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${localStorage.getItem('token')}`
        },
        body: JSON.stringify({
          query,
          session_id: sessionId || `session-${Date.now()}`
        })
      });

      if (!response.ok) throw new Error(`HTTP ${response.status}`);

      const { task_id, workflow_id, stream_url } = await response.json();
      setWorkflowId(workflow_id);

      // 连接到流
      const eventSource = new EventSource(`${apiUrl}${stream_url}`);
      eventSourceRef.current = eventSource;

      eventSource.onmessage = (e) => {
        const event = JSON.parse(e.data);
        setEvents(prev => [...prev, event]);

        if (event.type === 'WORKFLOW_COMPLETED' || event.type === 'ERROR_OCCURRED') {
          eventSource.close();
          setIsStreaming(false);
        }
      };

      eventSource.onerror = (err) => {
        setError(err);
        setIsStreaming(false);
        eventSource.close();
      };

    } catch (err) {
      setError(err);
      setIsStreaming(false);
    }
  }, [apiUrl]);

  const stopStreaming = useCallback(() => {
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      setIsStreaming(false);
    }
  }, []);

  return { submitTask, stopStreaming, events, isStreaming, error, workflowId };
}

// 在组件中使用
function TaskStreamDemo() {
  const { submitTask, stopStreaming, events, isStreaming, error, workflowId } = useTaskStream();

  return (
    <div>
      <button
        onClick={() => submitTask("15 + 25 等于多少?")}
        disabled={isStreaming}
      >
        {isStreaming ? '处理中...' : '提交任务'}
      </button>

      {workflowId && <p>工作流 ID: {workflowId}</p>}
      {error && <p style={{color: 'red'}}>错误: {error.message}</p>}

      <div>
        {events.map((event, idx) => (
          <div key={idx}>
            <strong>{event.type}</strong>: {event.message || event.agent_id}
          </div>
        ))}
      </div>

      {isStreaming && <button onClick={stopStreaming}>停止</button>}
    </div>
  );
}

示例:Vue 3 Composition API

<script setup>
import { ref } from 'vue';

const events = ref([]);
const isStreaming = ref(false);
const error = ref(null);
const workflowId = ref(null);
let eventSource = null;

async function submitTask(query) {
  events.value = [];
  error.value = null;
  isStreaming.value = true;

  try {
    const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${localStorage.getItem('token')}`
      },
      body: JSON.stringify({
        query,
        session_id: `session-${Date.now()}`
      })
    });

    if (!response.ok) throw new Error(`HTTP ${response.status}`);

    const { workflow_id, stream_url } = await response.json();
    workflowId.value = workflow_id;

    eventSource = new EventSource(`http://localhost:8080${stream_url}`);

    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      events.value.push(event);

      if (event.type === 'WORKFLOW_COMPLETED') {
        eventSource.close();
        isStreaming.value = false;
      }
    };

    eventSource.onerror = (err) => {
      error.value = err;
      eventSource.close();
      isStreaming.value = false;
    };
  } catch (err) {
    error.value = err;
    isStreaming.value = false;
  }
}

function stopStreaming() {
  if (eventSource) {
    eventSource.close();
    isStreaming.value = false;
  }
}
</script>

<template>
  <div>
    <button @click="submitTask('分析数据')" :disabled="isStreaming">
      {{ isStreaming ? '处理中...' : '提交任务' }}
    </button>

    <div v-if="workflowId">工作流: {{ workflowId }}</div>
    <div v-if="error" style="color: red">错误: {{ error.message }}</div>

    <div v-for="(event, idx) in events" :key="idx">
      <strong>{{ event.type }}</strong>: {{ event.message || event.agent_id }}
    </div>

    <button v-if="isStreaming" @click="stopStreaming">停止</button>
  </div>
</template>

示例:Python

import httpx
import json
import time

def submit_and_stream(query: str, api_key: str):
    """提交任务并流式传输事件。"""

    # 1. 提交任务
    response = httpx.post(
        "http://localhost:8080/api/v1/tasks/stream",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        },
        json={
            "query": query,
            "session_id": f"session-{int(time.time())}"
        }
    )

    data = response.json()
    workflow_id = data["workflow_id"]
    stream_url = data["stream_url"]

    print(f"任务已提交: {workflow_id}")

    # 2. 流式传输事件
    with httpx.stream(
        "GET",
        f"http://localhost:8080{stream_url}",
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=None
    ) as stream_response:
        for line in stream_response.iter_lines():
            if line.startswith("data:"):
                event = json.loads(line[5:])
                print(f"[{event['type']}] {event.get('message', '')}")

                if event['type'] in ['WORKFLOW_COMPLETED']:
                    return event

# 使用示例
result = submit_and_stream("法国的首都是什么?", "sk_test_123456")
print("最终结果:", result.get('result'))
为什么使用此端点? 统一端点确保您在提交后立即开始流式传输,防止在分别提交和连接时可能错过的任何事件。

Server-Sent Events (SSE)

GET /api/v1/stream/sse

使用 Server-Sent Events 进行实时事件流传输。

身份验证

必需:是
X-API-Key: sk_test_123456

查询参数

参数类型必需描述
workflow_idstring任务/工作流标识符
typesstring逗号分隔的事件类型以筛选
last_event_idstring从特定事件 ID 恢复。支持 Redis Stream ID(如 1700000000000-0)或数字序号(如 42)。当为数字时,重放规则为 seq > last_event_id

事件格式

每个事件遵循 SSE 规范:
id: <event_id>
event: <event_type>
data: <json_payload>

示例请求

curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \\
  -H "X-API-Key: sk_test_123456"

示例响应

id: 1
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","timestamp":"2025-10-22T10:30:00Z","message":"工作流已启动"}

id: 2
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","agent_id":"agent_1","message":"分析查询...","timestamp":"2025-10-22T10:30:01Z"}

id: 3
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","tool":"web_search","params":{"query":"Python 编程"},"timestamp":"2025-10-22T10:30:02Z"}

id: 4
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","tool":"web_search","result":"...","timestamp":"2025-10-22T10:30:05Z"}

id: 5
event: AGENT_COMPLETED
data: {"workflow_id":"task_abc123","agent_id":"agent_1","result":"Python 是一种高级编程语言...","timestamp":"2025-10-22T10:30:06Z"}

id: 6
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","status":"COMPLETED","result":"...","timestamp":"2025-10-22T10:30:07Z"}

WebSocket

GET /api/v1/stream/ws

通过 WebSocket 进行双向流式传输。

身份验证

网关通过仅使用标头(X-API-KeyAuthorization)来对 WebSocket 连接进行身份验证。浏览器在 WebSocket 握手期间无法设置自定义标头。对于浏览器使用:
  • GATEWAY_SKIP_AUTH=1 本地运行,或
  • 使用反向代理在转发到网关之前注入标头。
服务器环境中基于标头的示例: Node (ws):
import WebSocket from 'ws';

const ws = new WebSocket('ws://localhost:8080/api/v1/stream/ws', {
  headers: { 'X-API-Key': 'sk_test_123456' },
});

ws.on('open', () => {
  ws.send(JSON.stringify({ type: 'subscribe', workflow_id: 'task_abc123' }));
});

ws.on('message', (msg) => {
  const data = JSON.parse(msg.toString());
  console.log('事件:', data.type, data.message);
});

ws.on('error', (err) => console.error('WebSocket 错误:', err));
Python (websockets):
import asyncio, json, websockets

async def main():
    async with websockets.connect(
        'ws://localhost:8080/api/v1/stream/ws',
        extra_headers={'X-API-Key': 'sk_test_123456'},
    ) as ws:
        await ws.send(json.dumps({'type': 'subscribe', 'workflow_id': 'task_abc123'}))
        async for message in ws:
            data = json.loads(message)
            print('事件:', data.get('type'), data.get('message'))

asyncio.run(main())
不支持通过查询字符串传递 API 密钥或通过网关的连接后”auth”消息。

消息类型

客户端 → 服务器
// 订阅工作流
{
  "type": "subscribe",
  "workflow_id": "task_abc123",
  "types": ["AGENT_THINKING", "TOOL_INVOKED"]
}

// 取消订阅
{
  "type": "unsubscribe",
  "workflow_id": "task_abc123"
}

// Ping(保活)
{
  "type": "ping"
}
服务器 → 客户端
// 事件
{
  "type": "AGENT_THINKING",
  "workflow_id": "task_abc123",
  "message": "分析查询...",
  "timestamp": "2025-10-22T10:30:00Z"
}

// Pong(保活响应)
{
  "type": "pong"
}

OpenAI 兼容的流式传输格式

Shannon 的 SSE 流式传输使用 OpenAI 兼容格式 处理使用元数据和事件结构,可与基于 OpenAI 的工具无缝集成,同时保持 Shannon 的综合事件分类体系。

使用元数据格式

所有 LLM 提供商以 OpenAI 标准结构返回使用数据:
{
  "type": "LLM_OUTPUT",
  "data": {
    "output": "响应文本...",
    "model": "gpt-5-nano-2025-08-07",
    "provider": "openai",
    "usage": {
      "total_tokens": 350,
      "input_tokens": 200,
      "output_tokens": 150
    },
    "cost_usd": 0.0105
  }
}
兼容性矩阵:
提供商使用元数据流式选项
OpenAI✅ 完全支持stream_options: {"include_usage": true}
Anthropic✅ 完全支持最终流式块
Google✅ 完全支持最终流式块
Groq✅ 完全支持最终流式块
xAI⚠️ 有限支持仅在一元模式下可用
OpenAI 兼容端点⚠️ 优雅降级取决于端点支持
注意: Shannon 自动处理不支持 stream_options 参数的提供商,进行优雅降级——流式传输可工作,但使用元数据在流式传输期间可能不可用。

事件类型

核心事件

事件类型描述何时触发
WORKFLOW_STARTED工作流执行开始任务开始
WORKFLOW_COMPLETED工作流成功完成任务结束(成功)

代理事件

事件类型描述载荷字段
AGENT_THINKING代理推理/规划agent_idmessage
AGENT_COMPLETED代理完成执行agent_idresult

工具事件

事件类型描述载荷字段
TOOL_INVOKED工具执行已启动toolparams
TOOL_OBSERVATION代理观察工具结果toolresult

LLM 事件

事件类型描述载荷字段
LLM_PROMPT发送给 LLM 的提示text
LLM_PARTIAL流式 LLM 输出text
LLM_OUTPUT最终 LLM 输出text

进度与系统事件

事件类型描述载荷字段
PROGRESS进度更新progressmessage
DATA_PROCESSING数据处理中message
WAITING等待资源message
ERROR_OCCURRED发生错误errorseverity
ERROR_RECOVERY错误恢复尝试message
WORKSPACE_UPDATED内存/上下文已更新message
BUDGET_THRESHOLD任务预算达到预警阈值usage_percentthreshold_percenttokens_usedtokens_budgetlevelbudget_type

流生命周期事件

事件类型描述
STREAM_END显式的流结束信号(该工作流不会再有新事件)
在 SSE 中,STREAM_END 生命周期事件通过名为 done 的 SSE 事件发送,数据为纯文本 [DONE](不是 JSON)。在 WebSocket 中,它会以普通 JSON 事件的形式出现,字段为 "type": "STREAM_END"

团队与审批

事件类型描述
TEAM_RECRUITED组建多智能体团队
TEAM_RETIRED解散团队
TEAM_STATUS团队协调更新
ROLE_ASSIGNED分配代理角色
DELEGATION任务已委托
DEPENDENCY_SATISFIED依赖已解决
APPROVAL_REQUESTED需要人工批准
APPROVAL_DECISION审批决策已记录

代码示例

Python with httpx (SSE)

import httpx
import json

def stream_task_events(task_id: str, api_key: str):
    """使用 SSE 流式传输任务事件。"""
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"

    with httpx.stream(
        "GET",
        url,
        headers={"X-API-Key": api_key},
        timeout=None  # 无流式传输超时
    ) as response:
        for line in response.iter_lines():
            if line.startswith("data:"):
                data = json.loads(line[5:])  # 移除"data:"前缀
                yield data

# 用法
for event in stream_task_events("task_abc123", "sk_test_123456"):
    print(f"[{event.get('type')}] {event.get('message', '')}")

    if event.get('type') == 'WORKFLOW_COMPLETED':
        print("最终结果:", event.get('result'))
        break

Python - 流式传输带事件筛选

def stream_filtered_events(task_id: str, api_key: str, event_types: list):
    """仅流式传输特定事件类型。"""
    types_param = ",".join(event_types)
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}&types={types_param}"

    with httpx.stream("GET", url, headers={"X-API-Key": api_key}) as response:
        for line in response.iter_lines():
            if line.startswith("data:"):
                yield json.loads(line[5:])

# 仅接收代理思考和工具事件
for event in stream_filtered_events(
    "task_abc123",
    "sk_test_123456",
    ["AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"]
):
    print(f"{event['type']}{event.get('message', event.get('tool'))}")

JavaScript/Node.js (SSE)

const EventSource = require('eventsource');

function streamTaskEvents(taskId, apiKey) {
  const url = `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`;

  const eventSource = new EventSource(url, {
    headers: {
      'X-API-Key': apiKey
    }
  });

  eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log(`[${data.type}] ${data.message || ''}`);

    if (data.type === 'WORKFLOW_COMPLETED') {
      console.log('最终结果:', data.result);
      eventSource.close();
    }
  };

  eventSource.onerror = (error) => {
    console.error('SSE 错误:', error);
    eventSource.close();
  };

  return eventSource;
}

// 用法
const stream = streamTaskEvents('task_abc123', 'sk_test_123456');

// 手动关闭
setTimeout(() => stream.close(), 60000);

JavaScript/Node.js - WebSocket (ws)

import WebSocket from 'ws';

function connectWebSocket(taskId, apiKey) {
  const ws = new WebSocket(
    `ws://localhost:8080/api/v1/stream/ws?workflow_id=${taskId}`,
    { headers: { 'X-API-Key': apiKey } }
  );

  ws.on('open', () => console.log('✓ 已连接'));

  ws.on('message', (msg) => {
    const data = JSON.parse(msg.toString());
    switch (data.type) {
      case 'AGENT_THINKING':
        console.log(`💭 ${data.message}`);
        break;
      case 'TOOL_INVOKED':
        console.log(`🔧 工具:${data.tool}`);
        break;
      case 'TOOL_OBSERVATION':
        console.log(`✓ 结果:${data.result}`);
        break;
      case 'WORKFLOW_COMPLETED':
        console.log(`✓ 完成:${data.result}`);
        ws.close();
        break;
      default:
        console.log(`[${data.type}] ${data.message || ''}`);
    }
  });

  ws.on('error', (err) => console.error('❌ 错误:', err));
  ws.on('close', () => console.log('连接已关闭'));

  return ws;
}

const ws = connectWebSocket('task_abc123', 'sk_test_123456');

Go (SSE)

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
)

type Event struct {
    Type      string                 `json:"type"`
    WorkflowID string                `json:"workflow_id"`
    Message   string                 `json:"message,omitempty"`
    Data      map[string]interface{} `json:",inline"`
}

func streamEvents(taskID, apiKey string) error {
    url := fmt.Sprintf(
        "http://localhost:8080/api/v1/stream/sse?workflow_id=%s",
        taskID,
    )

    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("X-API-Key", apiKey)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    scanner := bufio.NewScanner(resp.Body)

    for scanner.Scan() {
        line := scanner.Text()

        if strings.HasPrefix(line, "data:") {
            data := strings.TrimPrefix(line, "data:")

            var event Event
            json.Unmarshal([]byte(data), &event)

            fmt.Printf("[%s] %s\n", event.Type, event.Message)

            if event.Type == "WORKFLOW_COMPLETED" {
                fmt.Println("✓ 任务完成")
                break
            }
        }
    }

    return scanner.Err()
}

func main() {
    err := streamEvents("task_abc123", "sk_test_123456")
    if err != nil {
        fmt.Println("错误:", err)
    }
}

Bash/curl (SSE)

#!/bin/bash

API_KEY="sk_test_123456"
TASK_ID="$1"

curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=$TASK_ID" \\
  -H "X-API-Key: $API_KEY" \\
  | while IFS= read -r line; do
    if [[ $line == data:* ]]; then
      # 从"data:{...}"提取 JSON
      JSON="${line#data:}"

      # 解析并显示
      TYPE=$(echo "$JSON" | jq -r '.type')
      MESSAGE=$(echo "$JSON" | jq -r '.message // ""')

      echo "[$(date +%T)] $TYPE$MESSAGE"

      # 完成时退出
      if [[ "$TYPE" == "WORKFLOW_COMPLETED" ]]; then
        echo ""
        echo "$JSON" | jq -r '.result'
        break
      fi
    fi
  done

用例

1. 实时进度显示

def display_progress(task_id: str, api_key: str):
    """向用户显示实时进度。"""
    print(f"任务 {task_id} 启动...")

    for event in stream_task_events(task_id, api_key):
        event_type = event.get('type')

        if event_type == 'AGENT_THINKING':
            print(f"💭 {event['message']}")
        elif event_type == 'TOOL_INVOKED':
            print(f"🔧 使用工具:{event['tool']}")
        elif event_type == 'TOOL_OBSERVATION':
            print(f"✓ 工具已完成")
        elif event_type == 'LLM_PARTIAL':
            print(event['text'], end='', flush=True)
        elif event_type == 'WORKFLOW_COMPLETED':
            print(f"\n\n✓ 完成!")
            return event['result']

2. 将所有事件记录到文件

import json
from datetime import datetime

def log_events_to_file(task_id: str, api_key: str, log_file: str):
    """将所有事件记录到 JSON Lines 文件。"""
    with open(log_file, 'a') as f:
        for event in stream_task_events(task_id, api_key):
            # 添加时间戳
            event['logged_at'] = datetime.now().isoformat()

            # 以 JSON Lines 格式写入
            f.write(json.dumps(event) + '\n')
            f.flush()

            if event.get('type') == 'WORKFLOW_COMPLETED':
                break

log_events_to_file("task_abc123", "sk_test_123456", "task_events.jsonl")

3. 收集工具使用指标

def collect_tool_metrics(task_id: str, api_key: str):
    """收集有关工具使用的指标。"""
    metrics = {
        "tools_invoked": 0,
        "tools_succeeded": 0,
        "tools_failed": 0,
        "tool_list": []
    }

    for event in stream_task_events(task_id, api_key):
        if event.get('type') == 'TOOL_INVOKED':
            metrics['tools_invoked'] += 1
            metrics['tool_list'].append(event['tool'])
        elif event.get('type') == 'TOOL_OBSERVATION':
            metrics['tools_succeeded'] += 1
        elif event.get('type') == 'ERROR_OCCURRED':
            metrics['tools_failed'] += 1
        elif event.get('type') == 'WORKFLOW_COMPLETED':
            break

    return metrics

metrics = collect_tool_metrics("task_abc123", "sk_test_123456")
print(f"使用的工具:{metrics['tool_list']}")
print(f"成功率:{metrics['tools_succeeded']}/{metrics['tools_invoked']}")

4. React UI 集成

import { useState, useEffect } from 'react';

function TaskMonitor({ taskId, apiKey }) {
  const [events, setEvents] = useState([]);
  const [status, setStatus] = useState('connecting');

  useEffect(() => {
    const eventSource = new EventSource(
      `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`,
      { headers: { 'X-API-Key': apiKey } }
    );

    eventSource.onopen = () => setStatus('connected');

    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setEvents(prev => [...prev, data]);

      if (data.type === 'WORKFLOW_COMPLETED') {
        setStatus('completed');
        eventSource.close();
      }
    };

    eventSource.onerror = () => {
      setStatus('error');
      eventSource.close();
    };

    return () => eventSource.close();
  }, [taskId, apiKey]);

  return (
    <div>
      <h3>状态:{status}</h3>
      <ul>
        {events.map((event, i) => (
          <li key={i}>
            <strong>{event.type}</strong>{event.message || event.tool}
          </li>
        ))}
      </ul>
    </div>
  );
}

最佳实践

1. 处理连接错误

import time

def stream_with_retry(task_id: str, api_key: str, max_retries: int = 3):
    """在连接失败时进行自动重试的流式传输。"""
    for attempt in range(max_retries):
        try:
            for event in stream_task_events(task_id, api_key):
                yield event

                if event.get('type') == 'WORKFLOW_COMPLETED':
                    return
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"连接失败,{2 ** attempt}s 后重试...")
                time.sleep(2 ** attempt)
            else:
                raise

2. 实施超时

import signal

def stream_with_timeout(task_id: str, api_key: str, timeout: int = 300):
    """具有超时的流式传输。"""
    def timeout_handler(signum, frame):
        raise TimeoutError("流式传输超时")

    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout)

    try:
        for event in stream_task_events(task_id, api_key):
            yield event
            if event.get('type') == 'WORKFLOW_COMPLETED':
                break
    finally:
        signal.alarm(0)  # 取消闹钟

3. 客户端过滤事件

def stream_specific_events(task_id: str, api_key: str, event_types: set):
    """客户端过滤事件。"""
    for event in stream_task_events(task_id, api_key):
        if event.get('type') in event_types:
            yield event

        if event.get('type') == 'WORKFLOW_COMPLETED':
            yield event
            break

# 仅接收代理和工具事件
for event in stream_specific_events(
    "task_abc123",
    "sk_test_123456",
    {"AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"}
):
    print(event)

4. 从最后一个事件恢复

def stream_with_resume(task_id: str, api_key: str, last_event_id: str = None):
    """从特定事件恢复流式传输。"""
    url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"

    if last_event_id:
        url += f"&last_event_id={last_event_id}"

    # 流式传输事件...
说明:last_event_id 支持 Redis Stream ID(如 1700000000000-0)或数字序号(如 42)。当为数字时,重放规则为 seq > last_event_id

比较:SSE vs WebSocket vs 轮询

功能SSEWebSocket轮询
方向服务器 → 客户端双向客户端 → 服务器
协议HTTPWebSocketHTTP
自动重连是(浏览器)否(手动)N/A
开销非常低
简洁性
使用场景实时更新交互式应用简单状态
Shannon 支持✅ 推荐✅ 可用⚠️ 不理想

何时使用每个

  • SSE:大多数用例、实时监控、进度显示
  • WebSocket:交互式应用、需要双向通信
  • 轮询(GET /api/v1/tasks/):传统系统、无流式传输支持

相关端点

注意

事件保留
  • Redis:所有事件存储 24 小时(实时流式传输)
  • PostgreSQL:关键事件存储 90 天(历史查询)
  • 如果连接断开,使用 last_event_id 恢复流式传输
连接限制
  • 每个 API 密钥最多 100 个并发流式传输连接
  • 5 分钟无活动超时(自动关闭连接)
  • 每个连接 1MB 缓冲区大小限制
  • 考虑在单个 WebSocket 上多路复用多个工作流