Shannon 提供实时事件流式传输,因此您可以在任务执行时进行监控。这对于以下场景至关重要:
向用户提供实时反馈
调试工作流执行
构建交互式 UI
监控长时间运行的任务
流式传输技术
Shannon 支持两种流式传输协议:
协议 用例 特性 SSE (服务器发送事件)单向服务器→客户端 简单、基于 HTTP、自动重连 WebSocket 双向 全双工、较低延迟
由于 SSE 的简单性和内置重连处理,建议大多数用例使用 SSE。
服务器发送事件(SSE)
使用 cURL
curl -N http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
输出:
event: WORKFLOW_STARTED
data: {"workflow_id":"task-123","agent_id":"orchestrator","message":"任务处理已开始"}
event: DATA_PROCESSING
data: {"workflow_id":"task-123","message":"准备上下文"}
event: PROGRESS
data: {"workflow_id":"task-123","agent_id":"planner","message":"创建了包含 3 个步骤的计划"}
event: AGENT_STARTED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"正在处理查询"}
event: LLM_PROMPT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"正在搜索信息..."}
event: TOOL_INVOKED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"调用 web_search,查询='主题 A'"}
event: TOOL_OBSERVATION
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"找到 5 个结果"}
event: LLM_OUTPUT
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"研究完成:找到相关信息"}
event: AGENT_COMPLETED
data: {"workflow_id":"task-123","agent_id":"research-agent","message":"任务完成"}
event: WORKFLOW_COMPLETED
data: {"workflow_id":"task-123","message":"最终综合结果已准备就绪"}
使用 Python SDK
from shannon import ShannonClient
client = ShannonClient( base_url = "http://localhost:8080" )
# 提交任务
handle = client.submit_task(
query = "研究 AI 趋势并创建摘要"
)
# 流式传输事件
for event in client.stream(handle.workflow_id):
print ( f "[ { event.type } ] { event.message } " )
if event.type == "WORKFLOW_COMPLETED" :
print ( f "任务已完成: { event.message } " )
break
事件类型
Shannon 在不同类别中发出 33 种事件类型。以下是最常用的事件类型:
核心工作流事件
事件类型 描述 示例消息 WORKFLOW_STARTED任务处理开始 "任务处理已开始"WORKFLOW_COMPLETED工作流成功完成 "全部完成"AGENT_STARTED智能体开始处理 "正在处理查询"AGENT_COMPLETED智能体完成 "任务完成"STATUS_UPDATE状态更新/进度 "正在规划下一步"ERROR_OCCURRED执行期间发生错误 "连接失败:超时"
LLM 和工具事件
事件类型 描述 示例消息 LLM_PROMPT发送给 LLM 的提示 "5 + 5 等于多少?"LLM_OUTPUT完整的 LLM 响应 "5 + 5 等于 10"LLM_PARTIAL流式传输块(通常已过滤) "5 + 5"TOOL_INVOKED工具执行开始 "调用 database_query"TOOL_OBSERVATION工具结果/输出 "查询返回 42 行"
进度和状态事件
事件类型 描述 示例消息 PROGRESS步骤完成更新 "创建了包含 3 个步骤的计划"DATA_PROCESSING处理/分析数据 "准备上下文"DELEGATION任务委托给另一个智能体 "移交给简单任务"
多智能体事件(需要功能门控)
事件类型 描述 示例消息 MESSAGE_SENT智能体发送消息(需要 p2p_v1) "请分析第 3 节"MESSAGE_RECEIVED智能体接收消息(需要 p2p_v1) "收到任务"TEAM_RECRUITED招募智能体(需要 dynamic_team_v1) "总结第 3 节"TEAM_RETIRED智能体退役(需要 dynamic_team_v1) "任务已完成"
WebSocket 流式传输
通过 WebSocket 连接
import asyncio
import websockets
import json
async def stream_task ():
uri = f "ws://localhost:8080/api/v1/stream/ws?workflow_id= { workflow_id } "
# 通过标头传递 API 密钥(网关需要基于标头的身份验证)
async with websockets.connect(
uri, extra_headers = { 'X-API-Key' : 'sk_test_123456' }
) as websocket:
while True :
message = await websocket.recv()
event = json.loads(message)
print ( f "事件: { event[ 'type' ] } " )
if event[ 'type' ] == 'WORKFLOW_COMPLETED' :
break
asyncio.run(stream_task())
WebSocket 流式传输目前仅支持服务器到客户端。使用 REST API /api/v1/tasks/{id}/cancel 取消任务。
过滤事件
按类型过滤事件以减少噪音:
# 仅显示重要事件
for event in client.stream(workflow_id):
if event.type in [ 'PROGRESS' , 'AGENT_COMPLETED' , 'WORKFLOW_COMPLETED' ]:
print ( f " { event.type } : { event.message } " )
进度跟踪
从事件计算任务进度:
def track_progress ( workflow_id ):
agents_started = 0
agents_completed = 0
for event in client.stream(workflow_id):
if event.type == 'PROGRESS' :
# 跟踪规划器的进度更新
print ( f "进度: { event.message } " )
elif event.type == 'AGENT_STARTED' :
agents_started += 1
print ( f "智能体已启动: { event.agent_id } " )
elif event.type == 'AGENT_COMPLETED' :
agents_completed += 1
if agents_started > 0 :
progress = (agents_completed / agents_started) * 100
print ( f "进度: { progress :.1f} % ( { agents_completed } / { agents_started } )" )
elif event.type == 'WORKFLOW_COMPLETED' :
print ( "✅ 任务完成!" )
break
track_progress(handle.workflow_id)
输出:
进度:创建了包含 3 个步骤的计划
智能体已启动:research-agent
智能体已启动:analysis-agent
智能体已启动:writer-agent
进度:33.3% (1/3)
进度:66.7% (2/3)
进度:100.0% (3/3)
✅ 任务完成!
实时 UI 示例
React 组件
import { useEffect , useState } from 'react' ;
function TaskMonitor ({ taskId , apiKey }) {
const [ events , setEvents ] = useState ([]);
const [ progress , setProgress ] = useState ( 0 );
useEffect (() => {
const params = new URLSearchParams ({ workflow_id: taskId });
// 注意:浏览器 EventSource 不支持自定义请求头。
// 生产环境请由后端发起 SSE 并注入认证请求头;
// 开发环境可使用 GATEWAY_SKIP_AUTH=1 关闭认证。
const eventSource = new EventSource (
`http://localhost:8080/api/v1/stream/sse? ${ params } `
);
const handleEvent = ( e ) => {
const event = JSON . parse ( e . data );
setEvents ( prev => [ ... prev , event ]);
// 根据工作流生命周期更新进度
if ( event . type === 'WORKFLOW_STARTED' ) {
setProgress ( 10 );
} else if ( event . type === 'PROGRESS' ) {
setProgress ( prev => Math . min ( prev + 20 , 80 ));
} else if ( event . type === 'WORKFLOW_COMPLETED' ) {
setProgress ( 100 );
}
};
// 监听特定事件类型(Shannon 使用命名的 SSE 事件)
const eventTypes = [
'WORKFLOW_STARTED' ,
'WORKFLOW_COMPLETED' ,
'AGENT_STARTED' ,
'AGENT_COMPLETED' ,
'STATUS_UPDATE' ,
'PROGRESS' ,
'ERROR_OCCURRED' ,
'LLM_OUTPUT' ,
'TOOL_INVOKED' ,
'TOOL_OBSERVATION'
];
eventTypes . forEach ( type => {
eventSource . addEventListener ( type , handleEvent );
});
// 未命名事件的备用处理器
eventSource . onmessage = handleEvent ;
eventSource . onerror = () => {
console . error ( 'SSE 连接错误' );
};
return () => {
eventTypes . forEach ( type => {
eventSource . removeEventListener ( type , handleEvent );
});
eventSource . close ();
};
}, [ taskId , apiKey ]);
return (
< div >
< progress value = { progress } max = { 100 } />
< ul >
{ events . map (( e , i ) => (
< li key = { i } >
< strong > [ { e . type } ] </ strong > { ' ' }
{ e . agent_id && < span > ( { e . agent_id } ) </ span > }{ ' ' }
{ e . message }
</ li >
)) }
</ ul >
</ div >
);
}
关键实现细节:
Shannon 发出命名的 SSE 事件 (例如,event: AGENT_STARTED),因此您必须为每种事件类型使用 addEventListener()
浏览器的 EventSource API 不支持自定义请求头。不要通过查询参数传递 API 密钥;应由后端代理注入请求头(或在开发环境关闭认证)
始终在清理函数中清除事件监听器,以防止内存泄漏
错误处理
优雅地处理连接失败:
from shannon import ShannonClient
from shannon.errors import ShannonError, ConnectionError
import time
def robust_streaming ( workflow_id , max_retries = 3 ):
client = ShannonClient( base_url = "http://localhost:8080" )
for attempt in range (max_retries):
try :
for event in client.stream(workflow_id):
print ( f "事件: { event.type } " )
if event.type == 'WORKFLOW_COMPLETED' :
# 获取最终状态以检索结果
status = client.get_status(workflow_id.replace( 'wf-' , 'task-' ))
return status.result
except (ShannonError, ConnectionError ) as e:
print ( f "流错误(尝试 { attempt + 1 } ): { e } " )
if attempt < max_retries - 1 :
time.sleep( 2 ** attempt) # 指数退避
else :
raise
流式传输身份验证
启用身份验证时,提供 API 密钥:
带 API 密钥的 SSE
curl -N \
-H "X-API-Key: sk_test_123456" \
http://localhost:8080/api/v1/stream/sse?workflow_id={task_id}
带 API 密钥的 Python SDK
client = ShannonClient(
base_url = "http://localhost:8080" ,
api_key = "sk_test_123456"
)
# API 密钥自动包含在所有请求中
for event in client.stream(workflow_id):
print (event)
性能考虑
事件缓冲
Shannon 缓冲事件以防止压垮客户端:
# config/shannon.yaml
streaming :
buffer_size : 100 # 最大排队事件数
flush_interval_ms : 100 # 每 100 毫秒发送批次
保持连接
SSE 每 15 秒发送保持连接注释以防止超时:
: keepalive
: keepalive
event: LLM_PROMPT
data: {"message":"..."}
资源清理
完成后始终关闭流:
# 遍历事件(自动处理连接)
for event in client.stream(workflow_id):
if event.type == 'WORKFLOW_COMPLETED' :
break
# 无需显式关闭
多任务监控
同时监控多个任务:
import asyncio
async def monitor_task ( client , workflow_id ):
# 在异步上下文中使用同步 stream()
for event in client.stream(workflow_id):
print ( f "[ { workflow_id } ] { event.type } " )
if event.type == 'WORKFLOW_COMPLETED' :
# 从状态获取最终结果
task_id = workflow_id.replace( 'wf-' , 'task-' )
status = client.get_status(task_id)
return status.result
async def monitor_all ( task_ids ):
client = AsyncShannonClient( base_url = "http://localhost:8080" )
# 并行监控所有任务
results = await asyncio.gather( * [
monitor_task(client, tid) for tid in task_ids
])
return results
# 运行
task_ids = [ "task-1" , "task-2" , "task-3" ]
results = asyncio.run(monitor_all(task_ids))
桌面应用程序
Shannon 提供基于 Tauri 构建的原生桌面应用程序,用于实时任务监控和管理。桌面应用提供:
活动任务的实时事件流
任务提交和管理界面
实时指标和可视化
会话历史浏览
桌面应用程序已取代之前的 Web 仪表板。请参阅桌面应用指南 了解安装和使用说明。
最佳实践
1. 使用 SSE 进行简单监控
# ✅ 好:简单的 SSE 流式传输
for event in client.stream(workflow_id):
print (event.type)
2. 处理断开连接
# ✅ 好:重试逻辑
for attempt in range ( 3 ):
try :
for event in client.stream(workflow_id):
process_event(event)
break
except (ShannonError, ConnectionError ):
if attempt == 2 :
raise
time.sleep( 2 )
3. 过滤不必要的事件
# ✅ 好:仅关键事件
critical_events = [ 'PROGRESS' , 'WORKFLOW_COMPLETED' , 'ERROR_OCCURRED' ]
for event in client.stream(workflow_id):
if event.type in critical_events:
handle_event(event)
4. 设置超时
# ✅ 好:超时保护
import signal
def timeout_handler ( signum , frame ):
raise TimeoutError ( "流超时" )
signal.signal(signal. SIGALRM , timeout_handler)
signal.alarm( 300 ) # 5 分钟超时
try :
for event in client.stream(workflow_id):
print (event)
finally :
signal.alarm( 0 ) # 取消警报
下一步
Python SDK 使用 SDK 进行异步流式传输
WebSocket API WebSocket 文档