Shannon 通过 Server-Sent Events (SSE) 和 WebSocket 协议提供实时事件流传输。使用流式传输来监控任务执行、显示进度并在生成时接收结果。
认证:流式端点与其他 API 使用相同的认证请求头。
浏览器 EventSource 无法携带自定义请求头。
- 开发环境:设置
GATEWAY_SKIP_AUTH=1。
- 生产环境:通过后端代理转发 SSE,并注入
X-API-Key 或 Bearer 头。
不要通过 URL 查询参数传递 API 密钥。
流式传输限制:
- 超时:流在 5 分钟无活动后自动关闭
- 缓冲区大小:每个连接最大 1MB 缓冲数据
- 使用元数据:现在所有 LLM 提供商(OpenAI、Anthropic、Google、Groq、xAI)都可获取 token 计数和成本
| 方法 | 端点 | 协议 | 描述 |
|---|
POST | /api/v1/tasks/stream | HTTP+SSE | 提交任务并获取流 URL(推荐) |
GET | /api/v1/stream/sse | SSE | Server-Sent Events 端点 |
GET | /api/v1/stream/ws | WebSocket | WebSocket 流式传输端点 |
GET | /api/v1/tasks/{id}/events | HTTP | 获取历史事件(分页) |
统一提交 + 流式传输(推荐)
POST /api/v1/tasks/stream
提交任务并立即开始流式传输其事件的最简单方法。此端点在一次调用中结合了任务提交和流式设置。
最适合前端应用:此端点非常适合需要在提交任务后立即显示进度的实时 UI。
身份验证
必需:是
X-API-Key: sk_test_123456
或:
Authorization: Bearer YOUR_TOKEN
请求体
| 参数 | 类型 | 必需 | 描述 |
|---|
query | string | 是 | 自然语言任务描述 |
session_id | string | 否 | 多轮对话的会话标识符 |
context | object | 否 | 键值对形式的附加上下文数据 |
model_tier | string | 否 | 模型层级:small、medium、large |
model_override | string | 否 | 指定模型名称(规范 ID;例如 gpt-5) |
provider_override | string | 否 | 强制指定提供商(如 openai、anthropic、google) |
状态:201 Created
响应体:
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"workflow_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"stream_url": "/api/v1/stream/sse?workflow_id=task_01HQZX3Y9K8M2P4N5S7T9W2V"
}
响应字段
| 字段 | 类型 | 描述 |
|---|
workflow_id | string | 任务/工作流标识符 |
stream_url | string | SSE 流端点的相对 URL |
示例:JavaScript/TypeScript
async function submitAndStream(query, onEvent, onComplete, onError) {
try {
// 1. 提交任务并获取流 URL
const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_TOKEN'
},
body: JSON.stringify({
query: query,
session_id: `session-${Date.now()}`
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const { task_id, workflow_id, stream_url } = await response.json();
console.log(`任务已提交: ${workflow_id}`);
// 2. 连接到 SSE 流
const eventSource = new EventSource(
`http://localhost:8080${stream_url}`,
{ withCredentials: false }
);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
onEvent(event);
// 检查完成状态
if (event.type === 'WORKFLOW_COMPLETED') {
eventSource.close();
onComplete(event);
}
};
eventSource.onerror = (err) => {
console.error('SSE 错误:', err);
eventSource.close();
onError(err);
};
return { workflow_id, eventSource };
} catch (error) {
onError(error);
throw error;
}
}
// 使用示例
submitAndStream(
"分析第四季度收入趋势",
(event) => console.log(`[${event.type}]`, event.message),
(final) => console.log("已完成:", final.result),
(error) => console.error("错误:", error)
);
示例:React Hook
import { useState, useCallback, useRef } from 'react';
function useTaskStream(apiUrl = 'http://localhost:8080') {
const [events, setEvents] = useState([]);
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState(null);
const [workflowId, setWorkflowId] = useState(null);
const eventSourceRef = useRef(null);
const submitTask = useCallback(async (query, sessionId = null) => {
setEvents([]);
setError(null);
setIsStreaming(true);
try {
// 提交任务
const response = await fetch(`${apiUrl}/api/v1/tasks/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${localStorage.getItem('token')}`
},
body: JSON.stringify({
query,
session_id: sessionId || `session-${Date.now()}`
})
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const { task_id, workflow_id, stream_url } = await response.json();
setWorkflowId(workflow_id);
// 连接到流
const eventSource = new EventSource(`${apiUrl}${stream_url}`);
eventSourceRef.current = eventSource;
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
setEvents(prev => [...prev, event]);
if (event.type === 'WORKFLOW_COMPLETED' || event.type === 'ERROR_OCCURRED') {
eventSource.close();
setIsStreaming(false);
}
};
eventSource.onerror = (err) => {
setError(err);
setIsStreaming(false);
eventSource.close();
};
} catch (err) {
setError(err);
setIsStreaming(false);
}
}, [apiUrl]);
const stopStreaming = useCallback(() => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
setIsStreaming(false);
}
}, []);
return { submitTask, stopStreaming, events, isStreaming, error, workflowId };
}
// 在组件中使用
function TaskStreamDemo() {
const { submitTask, stopStreaming, events, isStreaming, error, workflowId } = useTaskStream();
return (
<div>
<button
onClick={() => submitTask("15 + 25 等于多少?")}
disabled={isStreaming}
>
{isStreaming ? '处理中...' : '提交任务'}
</button>
{workflowId && <p>工作流 ID: {workflowId}</p>}
{error && <p style={{color: 'red'}}>错误: {error.message}</p>}
<div>
{events.map((event, idx) => (
<div key={idx}>
<strong>{event.type}</strong>: {event.message || event.agent_id}
</div>
))}
</div>
{isStreaming && <button onClick={stopStreaming}>停止</button>}
</div>
);
}
示例:Vue 3 Composition API
<script setup>
import { ref } from 'vue';
const events = ref([]);
const isStreaming = ref(false);
const error = ref(null);
const workflowId = ref(null);
let eventSource = null;
async function submitTask(query) {
events.value = [];
error.value = null;
isStreaming.value = true;
try {
const response = await fetch('http://localhost:8080/api/v1/tasks/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${localStorage.getItem('token')}`
},
body: JSON.stringify({
query,
session_id: `session-${Date.now()}`
})
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const { workflow_id, stream_url } = await response.json();
workflowId.value = workflow_id;
eventSource = new EventSource(`http://localhost:8080${stream_url}`);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
events.value.push(event);
if (event.type === 'WORKFLOW_COMPLETED') {
eventSource.close();
isStreaming.value = false;
}
};
eventSource.onerror = (err) => {
error.value = err;
eventSource.close();
isStreaming.value = false;
};
} catch (err) {
error.value = err;
isStreaming.value = false;
}
}
function stopStreaming() {
if (eventSource) {
eventSource.close();
isStreaming.value = false;
}
}
</script>
<template>
<div>
<button @click="submitTask('分析数据')" :disabled="isStreaming">
{{ isStreaming ? '处理中...' : '提交任务' }}
</button>
<div v-if="workflowId">工作流: {{ workflowId }}</div>
<div v-if="error" style="color: red">错误: {{ error.message }}</div>
<div v-for="(event, idx) in events" :key="idx">
<strong>{{ event.type }}</strong>: {{ event.message || event.agent_id }}
</div>
<button v-if="isStreaming" @click="stopStreaming">停止</button>
</div>
</template>
示例:Python
import httpx
import json
import time
def submit_and_stream(query: str, api_key: str):
"""提交任务并流式传输事件。"""
# 1. 提交任务
response = httpx.post(
"http://localhost:8080/api/v1/tasks/stream",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"query": query,
"session_id": f"session-{int(time.time())}"
}
)
data = response.json()
workflow_id = data["workflow_id"]
stream_url = data["stream_url"]
print(f"任务已提交: {workflow_id}")
# 2. 流式传输事件
with httpx.stream(
"GET",
f"http://localhost:8080{stream_url}",
headers={"Authorization": f"Bearer {api_key}"},
timeout=None
) as stream_response:
for line in stream_response.iter_lines():
if line.startswith("data:"):
event = json.loads(line[5:])
print(f"[{event['type']}] {event.get('message', '')}")
if event['type'] in ['WORKFLOW_COMPLETED']:
return event
# 使用示例
result = submit_and_stream("法国的首都是什么?", "sk_test_123456")
print("最终结果:", result.get('result'))
为什么使用此端点? 统一端点确保您在提交后立即开始流式传输,防止在分别提交和连接时可能错过的任何事件。
Server-Sent Events (SSE)
GET /api/v1/stream/sse
使用 Server-Sent Events 进行实时事件流传输。
身份验证
必需:是
X-API-Key: sk_test_123456
查询参数
| 参数 | 类型 | 必需 | 描述 |
|---|
workflow_id | string | 是 | 任务/工作流标识符 |
types | string | 否 | 逗号分隔的事件类型以筛选 |
last_event_id | string | 否 | 从特定事件 ID 恢复。支持 Redis Stream ID(如 1700000000000-0)或数字序号(如 42)。当为数字时,重放规则为 seq > last_event_id。 |
事件格式
每个事件遵循 SSE 规范:
id: <event_id>
event: <event_type>
data: <json_payload>
示例请求
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=task_abc123" \\
-H "X-API-Key: sk_test_123456"
示例响应
id: 1
event: WORKFLOW_STARTED
data: {"workflow_id":"task_abc123","timestamp":"2025-10-22T10:30:00Z","message":"工作流已启动"}
id: 2
event: AGENT_THINKING
data: {"workflow_id":"task_abc123","agent_id":"agent_1","message":"分析查询...","timestamp":"2025-10-22T10:30:01Z"}
id: 3
event: TOOL_INVOKED
data: {"workflow_id":"task_abc123","tool":"web_search","params":{"query":"Python 编程"},"timestamp":"2025-10-22T10:30:02Z"}
id: 4
event: TOOL_OBSERVATION
data: {"workflow_id":"task_abc123","tool":"web_search","result":"...","timestamp":"2025-10-22T10:30:05Z"}
id: 5
event: AGENT_COMPLETED
data: {"workflow_id":"task_abc123","agent_id":"agent_1","result":"Python 是一种高级编程语言...","timestamp":"2025-10-22T10:30:06Z"}
id: 6
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task_abc123","status":"COMPLETED","result":"...","timestamp":"2025-10-22T10:30:07Z"}
WebSocket
GET /api/v1/stream/ws
通过 WebSocket 进行双向流式传输。
身份验证
网关通过仅使用标头(X-API-Key 或 Authorization)来对 WebSocket 连接进行身份验证。浏览器在 WebSocket 握手期间无法设置自定义标头。对于浏览器使用:
- 以
GATEWAY_SKIP_AUTH=1 本地运行,或
- 使用反向代理在转发到网关之前注入标头。
服务器环境中基于标头的示例:
Node (ws):
import WebSocket from 'ws';
const ws = new WebSocket('ws://localhost:8080/api/v1/stream/ws', {
headers: { 'X-API-Key': 'sk_test_123456' },
});
ws.on('open', () => {
ws.send(JSON.stringify({ type: 'subscribe', workflow_id: 'task_abc123' }));
});
ws.on('message', (msg) => {
const data = JSON.parse(msg.toString());
console.log('事件:', data.type, data.message);
});
ws.on('error', (err) => console.error('WebSocket 错误:', err));
Python (websockets):
import asyncio, json, websockets
async def main():
async with websockets.connect(
'ws://localhost:8080/api/v1/stream/ws',
extra_headers={'X-API-Key': 'sk_test_123456'},
) as ws:
await ws.send(json.dumps({'type': 'subscribe', 'workflow_id': 'task_abc123'}))
async for message in ws:
data = json.loads(message)
print('事件:', data.get('type'), data.get('message'))
asyncio.run(main())
不支持通过查询字符串传递 API 密钥或通过网关的连接后”auth”消息。
消息类型
客户端 → 服务器:
// 订阅工作流
{
"type": "subscribe",
"workflow_id": "task_abc123",
"types": ["AGENT_THINKING", "TOOL_INVOKED"]
}
// 取消订阅
{
"type": "unsubscribe",
"workflow_id": "task_abc123"
}
// Ping(保活)
{
"type": "ping"
}
服务器 → 客户端:
// 事件
{
"type": "AGENT_THINKING",
"workflow_id": "task_abc123",
"message": "分析查询...",
"timestamp": "2025-10-22T10:30:00Z"
}
// Pong(保活响应)
{
"type": "pong"
}
OpenAI 兼容的流式传输格式
Shannon 的 SSE 流式传输使用 OpenAI 兼容格式 处理使用元数据和事件结构,可与基于 OpenAI 的工具无缝集成,同时保持 Shannon 的综合事件分类体系。
使用元数据格式
所有 LLM 提供商以 OpenAI 标准结构返回使用数据:
{
"type": "LLM_OUTPUT",
"data": {
"output": "响应文本...",
"model": "gpt-5-nano-2025-08-07",
"provider": "openai",
"usage": {
"total_tokens": 350,
"input_tokens": 200,
"output_tokens": 150
},
"cost_usd": 0.0105
}
}
兼容性矩阵:
| 提供商 | 使用元数据 | 流式选项 |
|---|
| OpenAI | ✅ 完全支持 | stream_options: {"include_usage": true} |
| Anthropic | ✅ 完全支持 | 最终流式块 |
| Google | ✅ 完全支持 | 最终流式块 |
| Groq | ✅ 完全支持 | 最终流式块 |
| xAI | ⚠️ 有限支持 | 仅在一元模式下可用 |
| OpenAI 兼容端点 | ⚠️ 优雅降级 | 取决于端点支持 |
注意: Shannon 自动处理不支持 stream_options 参数的提供商,进行优雅降级——流式传输可工作,但使用元数据在流式传输期间可能不可用。
事件类型
核心事件
| 事件类型 | 描述 | 何时触发 |
|---|
WORKFLOW_STARTED | 工作流执行开始 | 任务开始 |
WORKFLOW_COMPLETED | 工作流成功完成 | 任务结束(成功) |
代理事件
| 事件类型 | 描述 | 载荷字段 |
|---|
AGENT_THINKING | 代理推理/规划 | agent_id、message |
AGENT_COMPLETED | 代理完成执行 | agent_id、result |
工具事件
| 事件类型 | 描述 | 载荷字段 |
|---|
TOOL_INVOKED | 工具执行已启动 | tool、params |
TOOL_OBSERVATION | 代理观察工具结果 | tool、result |
LLM 事件
| 事件类型 | 描述 | 载荷字段 |
|---|
LLM_PROMPT | 发送给 LLM 的提示 | text |
LLM_PARTIAL | 流式 LLM 输出 | text |
LLM_OUTPUT | 最终 LLM 输出 | text |
进度与系统事件
| 事件类型 | 描述 | 载荷字段 |
|---|
PROGRESS | 进度更新 | progress、message |
DATA_PROCESSING | 数据处理中 | message |
WAITING | 等待资源 | message |
ERROR_OCCURRED | 发生错误 | error、severity |
ERROR_RECOVERY | 错误恢复尝试 | message |
WORKSPACE_UPDATED | 内存/上下文已更新 | message |
BUDGET_THRESHOLD | 任务预算达到预警阈值 | usage_percent、threshold_percent、tokens_used、tokens_budget、level、budget_type |
流生命周期事件
| 事件类型 | 描述 |
|---|
STREAM_END | 显式的流结束信号(该工作流不会再有新事件) |
在 SSE 中,STREAM_END 生命周期事件通过名为 done 的 SSE 事件发送,数据为纯文本 [DONE](不是 JSON)。在 WebSocket 中,它会以普通 JSON 事件的形式出现,字段为 "type": "STREAM_END"。
团队与审批
| 事件类型 | 描述 |
|---|
TEAM_RECRUITED | 组建多智能体团队 |
TEAM_RETIRED | 解散团队 |
TEAM_STATUS | 团队协调更新 |
ROLE_ASSIGNED | 分配代理角色 |
DELEGATION | 任务已委托 |
DEPENDENCY_SATISFIED | 依赖已解决 |
APPROVAL_REQUESTED | 需要人工批准 |
APPROVAL_DECISION | 审批决策已记录 |
代码示例
Python with httpx (SSE)
import httpx
import json
def stream_task_events(task_id: str, api_key: str):
"""使用 SSE 流式传输任务事件。"""
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"
with httpx.stream(
"GET",
url,
headers={"X-API-Key": api_key},
timeout=None # 无流式传输超时
) as response:
for line in response.iter_lines():
if line.startswith("data:"):
data = json.loads(line[5:]) # 移除"data:"前缀
yield data
# 用法
for event in stream_task_events("task_abc123", "sk_test_123456"):
print(f"[{event.get('type')}] {event.get('message', '')}")
if event.get('type') == 'WORKFLOW_COMPLETED':
print("最终结果:", event.get('result'))
break
Python - 流式传输带事件筛选
def stream_filtered_events(task_id: str, api_key: str, event_types: list):
"""仅流式传输特定事件类型。"""
types_param = ",".join(event_types)
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}&types={types_param}"
with httpx.stream("GET", url, headers={"X-API-Key": api_key}) as response:
for line in response.iter_lines():
if line.startswith("data:"):
yield json.loads(line[5:])
# 仅接收代理思考和工具事件
for event in stream_filtered_events(
"task_abc123",
"sk_test_123456",
["AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"]
):
print(f"{event['type']}:{event.get('message', event.get('tool'))}")
JavaScript/Node.js (SSE)
const EventSource = require('eventsource');
function streamTaskEvents(taskId, apiKey) {
const url = `http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`;
const eventSource = new EventSource(url, {
headers: {
'X-API-Key': apiKey
}
});
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`[${data.type}] ${data.message || ''}`);
if (data.type === 'WORKFLOW_COMPLETED') {
console.log('最终结果:', data.result);
eventSource.close();
}
};
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
eventSource.close();
};
return eventSource;
}
// 用法
const stream = streamTaskEvents('task_abc123', 'sk_test_123456');
// 手动关闭
setTimeout(() => stream.close(), 60000);
JavaScript/Node.js - WebSocket (ws)
import WebSocket from 'ws';
function connectWebSocket(taskId, apiKey) {
const ws = new WebSocket(
`ws://localhost:8080/api/v1/stream/ws?workflow_id=${taskId}`,
{ headers: { 'X-API-Key': apiKey } }
);
ws.on('open', () => console.log('✓ 已连接'));
ws.on('message', (msg) => {
const data = JSON.parse(msg.toString());
switch (data.type) {
case 'AGENT_THINKING':
console.log(`💭 ${data.message}`);
break;
case 'TOOL_INVOKED':
console.log(`🔧 工具:${data.tool}`);
break;
case 'TOOL_OBSERVATION':
console.log(`✓ 结果:${data.result}`);
break;
case 'WORKFLOW_COMPLETED':
console.log(`✓ 完成:${data.result}`);
ws.close();
break;
default:
console.log(`[${data.type}] ${data.message || ''}`);
}
});
ws.on('error', (err) => console.error('❌ 错误:', err));
ws.on('close', () => console.log('连接已关闭'));
return ws;
}
const ws = connectWebSocket('task_abc123', 'sk_test_123456');
Go (SSE)
package main
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"strings"
)
type Event struct {
Type string `json:"type"`
WorkflowID string `json:"workflow_id"`
Message string `json:"message,omitempty"`
Data map[string]interface{} `json:",inline"`
}
func streamEvents(taskID, apiKey string) error {
url := fmt.Sprintf(
"http://localhost:8080/api/v1/stream/sse?workflow_id=%s",
taskID,
)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("X-API-Key", apiKey)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data:") {
data := strings.TrimPrefix(line, "data:")
var event Event
json.Unmarshal([]byte(data), &event)
fmt.Printf("[%s] %s\n", event.Type, event.Message)
if event.Type == "WORKFLOW_COMPLETED" {
fmt.Println("✓ 任务完成")
break
}
}
}
return scanner.Err()
}
func main() {
err := streamEvents("task_abc123", "sk_test_123456")
if err != nil {
fmt.Println("错误:", err)
}
}
Bash/curl (SSE)
#!/bin/bash
API_KEY="sk_test_123456"
TASK_ID="$1"
curl -N "http://localhost:8080/api/v1/stream/sse?workflow_id=$TASK_ID" \\
-H "X-API-Key: $API_KEY" \\
| while IFS= read -r line; do
if [[ $line == data:* ]]; then
# 从"data:{...}"提取 JSON
JSON="${line#data:}"
# 解析并显示
TYPE=$(echo "$JSON" | jq -r '.type')
MESSAGE=$(echo "$JSON" | jq -r '.message // ""')
echo "[$(date +%T)] $TYPE:$MESSAGE"
# 完成时退出
if [[ "$TYPE" == "WORKFLOW_COMPLETED" ]]; then
echo ""
echo "$JSON" | jq -r '.result'
break
fi
fi
done
1. 实时进度显示
def display_progress(task_id: str, api_key: str):
"""向用户显示实时进度。"""
print(f"任务 {task_id} 启动...")
for event in stream_task_events(task_id, api_key):
event_type = event.get('type')
if event_type == 'AGENT_THINKING':
print(f"💭 {event['message']}")
elif event_type == 'TOOL_INVOKED':
print(f"🔧 使用工具:{event['tool']}")
elif event_type == 'TOOL_OBSERVATION':
print(f"✓ 工具已完成")
elif event_type == 'LLM_PARTIAL':
print(event['text'], end='', flush=True)
elif event_type == 'WORKFLOW_COMPLETED':
print(f"\n\n✓ 完成!")
return event['result']
2. 将所有事件记录到文件
import json
from datetime import datetime
def log_events_to_file(task_id: str, api_key: str, log_file: str):
"""将所有事件记录到 JSON Lines 文件。"""
with open(log_file, 'a') as f:
for event in stream_task_events(task_id, api_key):
# 添加时间戳
event['logged_at'] = datetime.now().isoformat()
# 以 JSON Lines 格式写入
f.write(json.dumps(event) + '\n')
f.flush()
if event.get('type') == 'WORKFLOW_COMPLETED':
break
log_events_to_file("task_abc123", "sk_test_123456", "task_events.jsonl")
3. 收集工具使用指标
def collect_tool_metrics(task_id: str, api_key: str):
"""收集有关工具使用的指标。"""
metrics = {
"tools_invoked": 0,
"tools_succeeded": 0,
"tools_failed": 0,
"tool_list": []
}
for event in stream_task_events(task_id, api_key):
if event.get('type') == 'TOOL_INVOKED':
metrics['tools_invoked'] += 1
metrics['tool_list'].append(event['tool'])
elif event.get('type') == 'TOOL_OBSERVATION':
metrics['tools_succeeded'] += 1
elif event.get('type') == 'ERROR_OCCURRED':
metrics['tools_failed'] += 1
elif event.get('type') == 'WORKFLOW_COMPLETED':
break
return metrics
metrics = collect_tool_metrics("task_abc123", "sk_test_123456")
print(f"使用的工具:{metrics['tool_list']}")
print(f"成功率:{metrics['tools_succeeded']}/{metrics['tools_invoked']}")
4. React UI 集成
import { useState, useEffect } from 'react';
function TaskMonitor({ taskId, apiKey }) {
const [events, setEvents] = useState([]);
const [status, setStatus] = useState('connecting');
useEffect(() => {
const eventSource = new EventSource(
`http://localhost:8080/api/v1/stream/sse?workflow_id=${taskId}`,
{ headers: { 'X-API-Key': apiKey } }
);
eventSource.onopen = () => setStatus('connected');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setEvents(prev => [...prev, data]);
if (data.type === 'WORKFLOW_COMPLETED') {
setStatus('completed');
eventSource.close();
}
};
eventSource.onerror = () => {
setStatus('error');
eventSource.close();
};
return () => eventSource.close();
}, [taskId, apiKey]);
return (
<div>
<h3>状态:{status}</h3>
<ul>
{events.map((event, i) => (
<li key={i}>
<strong>{event.type}</strong>:{event.message || event.tool}
</li>
))}
</ul>
</div>
);
}
最佳实践
1. 处理连接错误
import time
def stream_with_retry(task_id: str, api_key: str, max_retries: int = 3):
"""在连接失败时进行自动重试的流式传输。"""
for attempt in range(max_retries):
try:
for event in stream_task_events(task_id, api_key):
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
return
except Exception as e:
if attempt < max_retries - 1:
print(f"连接失败,{2 ** attempt}s 后重试...")
time.sleep(2 ** attempt)
else:
raise
2. 实施超时
import signal
def stream_with_timeout(task_id: str, api_key: str, timeout: int = 300):
"""具有超时的流式传输。"""
def timeout_handler(signum, frame):
raise TimeoutError("流式传输超时")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout)
try:
for event in stream_task_events(task_id, api_key):
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
break
finally:
signal.alarm(0) # 取消闹钟
3. 客户端过滤事件
def stream_specific_events(task_id: str, api_key: str, event_types: set):
"""客户端过滤事件。"""
for event in stream_task_events(task_id, api_key):
if event.get('type') in event_types:
yield event
if event.get('type') == 'WORKFLOW_COMPLETED':
yield event
break
# 仅接收代理和工具事件
for event in stream_specific_events(
"task_abc123",
"sk_test_123456",
{"AGENT_THINKING", "TOOL_INVOKED", "TOOL_OBSERVATION"}
):
print(event)
4. 从最后一个事件恢复
def stream_with_resume(task_id: str, api_key: str, last_event_id: str = None):
"""从特定事件恢复流式传输。"""
url = f"http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}"
if last_event_id:
url += f"&last_event_id={last_event_id}"
# 流式传输事件...
说明:last_event_id 支持 Redis Stream ID(如 1700000000000-0)或数字序号(如 42)。当为数字时,重放规则为 seq > last_event_id。
比较:SSE vs WebSocket vs 轮询
| 功能 | SSE | WebSocket | 轮询 |
|---|
| 方向 | 服务器 → 客户端 | 双向 | 客户端 → 服务器 |
| 协议 | HTTP | WebSocket | HTTP |
| 自动重连 | 是(浏览器) | 否(手动) | N/A |
| 开销 | 低 | 非常低 | 高 |
| 简洁性 | 高 | 中 | 高 |
| 使用场景 | 实时更新 | 交互式应用 | 简单状态 |
| Shannon 支持 | ✅ 推荐 | ✅ 可用 | ⚠️ 不理想 |
何时使用每个
- SSE:大多数用例、实时监控、进度显示
- WebSocket:交互式应用、需要双向通信
- 轮询(GET /api/v1/tasks/):传统系统、无流式传输支持
相关端点
事件保留:
- Redis:所有事件存储 24 小时(实时流式传输)
- PostgreSQL:关键事件存储 90 天(历史查询)
- 如果连接断开,使用
last_event_id 恢复流式传输
连接限制:
- 每个 API 密钥最多 100 个并发流式传输连接
- 5 分钟无活动超时(自动关闭连接)
- 每个连接 1MB 缓冲区大小限制
- 考虑在单个 WebSocket 上多路复用多个工作流