メインコンテンツへスキップ

概要

SSEを使用して、シンプルで信頼性の高いリアルタイム更新を行います。WSが必要な環境では、オプションのWebSocketを利用できます。

SSE(推奨)

from shannon import ShannonClient, EventType
client = ShannonClient()

h = client.submit_task(
  "Q4の収益ドライバーとリスクについての約120語のエグゼクティブサマリーを作成します。部分的な出力をストリーミングし、最後の行を'Conclusion:'で終わらせます。"
)
for e in client.stream(h.workflow_id, types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]):
  print(f"{e.type}: {e.message}")
  if e.type == EventType.WORKFLOW_COMPLETED:
    break
最後のイベントから再開:
last_id = None
for e in client.stream(h.workflow_id, last_event_id=last_id):
  # ...
  last_id = e.id
注意:
  • last_event_idは、RedisストリームID(推奨)または数値シーケンスのいずれかを受け入れます。SDKはevent.idを公開しており、これによりstream_idが存在する場合はそれを選択し、存在しない場合はseqにフォールバックします。

WebSocket(オプション)

pip install websocketsが必要です。
from shannon import ShannonClient
client = ShannonClient()

h = client.submit_task(
  "最近の再生可能エネルギー政策の変更とそのコストへの影響についての約120語のブリーフを作成します。部分的な出力をストリーミングします。"
)
for e in client.stream_ws(h.workflow_id):
  print(e.type, e.message)
  if e.type == "WORKFLOW_COMPLETED":
    break

一般的なイベントタイプ

  • WORKFLOW_STARTED / WORKFLOW_COMPLETED
  • LLM_PROMPT / LLM_PARTIAL / LLM_OUTPUT
  • TOOL_INVOKED / TOOL_OBSERVATION
  • APPROVAL_REQUESTED / APPROVAL_DECISION
  • ERROR_OCCURRED
ヒント:EventTypeの列挙型や生の文字列を使用してフィルタリングします。