完全な非同期ドキュメントは近日公開予定です。基本的なパターンは以下に示されています。
概要
Shannon Python SDKは、同期および非同期クライアントの両方を提供します。非同期操作を使用する理由:- 同時タスクの送信
- 非ブロッキングイベントストリーミング
- 並列API呼び出し
- 高スループットアプリケーション
AsyncShannonClient
基本的な使い方
コピー
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient(base_url="http://localhost:8080") as client:
handle = await client.submit_task(
query=(
"収益トレンドに焦点を当てて段落を3つの箇条書きに要約してください。"
"出力: Markdownリスト。"
),
model_tier="small",
mode="standard",
)
final = await client.wait(handle.task_id)
print(final.result)
asyncio.run(main())
同時タスク
複数のタスクを同時に送信します:コピー
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# 複数のタスクを同時に送信
tasks = [
client.submit_task(query="5+5は何ですか?"),
client.submit_task(query="10*2は何ですか?"),
client.submit_task(query="100/4は何ですか?"),
]
# すべての送信を待機
handles = await asyncio.gather(*tasks)
# すべての結果を取得
results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])
for i, r in enumerate(results, 1):
print(f"タスク {i}: {r.result}")
asyncio.run(main())
非同期ストリーミング
ブロックせずにイベントをストリームします。ヒント:async for ループ内で他のクライアント呼び出しを待機しないでください — まず抜け出してから待機します:
コピー
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# タスクを送信
h = await client.submit_task(query="フランスの首都はどこですか?")
# イベントをストリーム
async for e in client.stream(h.workflow_id):
print(f"{e.type}: {e.message[:50]}")
if e.type == "WORKFLOW_COMPLETED":
break # ループをクリーンに終了
# 今は待機しても安全
final = await client.wait(h.task_id)
print(f"\n最終結果: {final.result}")
asyncio.run(main())
タイムアウト処理
コピー
async def with_timeout():
async with AsyncShannonClient() as client:
try:
# タイムアウト付きで送信
handle = await asyncio.wait_for(
client.submit_task(
query=(
"段落からトップ3のインサイトを抽出し、Markdownの箇条書きで返してください。"
)
),
timeout=30.0
)
# タイムアウト付きで完了を待機
result = await asyncio.wait_for(
client.wait(handle.task_id),
timeout=300.0
)
except asyncio.TimeoutError:
print("操作がタイムアウトしました")
バックグラウンドタスク
他の作業をしながらバックグラウンドでタスクを実行します:コピー
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# タスクを送信
handle = await client.submit_task(query="四半期の収益ドライバーとリスクについての約500語のレポートを生成してください。出力: Markdown。")
# バックグラウンドで待機を開始(ブロックしない)
task = asyncio.create_task(client.wait(handle.task_id))
# タスクが実行されている間に他の作業を行う
print("バックグラウンドで処理中...")
await asyncio.sleep(2) # 他の非同期作業
# ステータスを確認
if not task.done():
print("まだ処理中...")
# 必要なときに結果を取得
result = await task
print(f"結果: {result.result}")
asyncio.run(main())
エラーハンドリング
コピー
import asyncio
from shannon import AsyncShannonClient, ConnectionError, TaskTimeoutError
async def main():
async with AsyncShannonClient() as client:
try:
handle = await client.submit_task(query="AIとは何ですか?")
result = await client.wait(handle.task_id)
print(f"結果: {result.result}")
except ConnectionError:
print("Shannonへの接続に失敗しました")
except TaskTimeoutError:
print("タスクがタイムアウトしました")
except Exception as e:
print(f"予期しないエラー: {e}")
asyncio.run(main())
Webフレームワークとの統合
FastAPIの例
コピー
from contextlib import asynccontextmanager
from fastapi import FastAPI
from shannon import AsyncShannonClient
@asynccontextmanager
async def lifespan(app: FastAPI):
# スタートアップ: クライアントを作成
app.state.client = AsyncShannonClient()
yield
# シャットダウン: クライアントを閉じる
await app.state.client.close()
app = FastAPI(lifespan=lifespan)
@app.post("/analyze")
async def analyze(query: str):
client = app.state.client
handle = await client.submit_task(query=query)
result = await client.wait(handle.task_id)
return {"result": result.result}
コピー
# FastAPIとuvicornをインストール
pip install fastapi uvicorn
# サーバーを実行
uvicorn your_file:app --reload
# エンドポイントをテスト
curl -X POST "http://127.0.0.1:8000/analyze?query=AIとは何ですか?"
ベストプラクティス
- コンテキストマネージャ(
async with)を使用して適切にクリーンアップする - タイムアウトを処理して長時間実行される操作に対応する
- リトライロジックを実装してネットワーク障害に対処する
- gather()を使用して同時操作を行う
- イベントをストリームしてリアルタイム更新を行う