メインコンテンツへスキップ
完全な非同期ドキュメントは近日公開予定です。基本的なパターンは以下に示されています。

概要

Shannon Python SDKは、同期および非同期クライアントの両方を提供します。非同期操作を使用する理由:
  • 同時タスクの送信
  • 非ブロッキングイベントストリーミング
  • 並列API呼び出し
  • 高スループットアプリケーション

AsyncShannonClient

基本的な使い方

import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient(base_url="http://localhost:8080") as client:
        handle = await client.submit_task(
            query=(
                "収益トレンドに焦点を当てて段落を3つの箇条書きに要約してください。"
                "出力: Markdownリスト。"
            ),
            model_tier="small",
            mode="standard",
        )
        final = await client.wait(handle.task_id)
        print(final.result)

asyncio.run(main())

同時タスク

複数のタスクを同時に送信します:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # 複数のタスクを同時に送信
        tasks = [
            client.submit_task(query="5+5は何ですか?"),
            client.submit_task(query="10*2は何ですか?"),
            client.submit_task(query="100/4は何ですか?"),
        ]

        # すべての送信を待機
        handles = await asyncio.gather(*tasks)

        # すべての結果を取得
        results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])

        for i, r in enumerate(results, 1):
            print(f"タスク {i}: {r.result}")

asyncio.run(main())

非同期ストリーミング

ブロックせずにイベントをストリームします。ヒント: async for ループ内で他のクライアント呼び出しを待機しないでください — まず抜け出してから待機します:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # タスクを送信
        h = await client.submit_task(query="フランスの首都はどこですか?")

        # イベントをストリーム
        async for e in client.stream(h.workflow_id):
            print(f"{e.type}: {e.message[:50]}")
            if e.type == "WORKFLOW_COMPLETED":
                break  # ループをクリーンに終了

        # 今は待機しても安全
        final = await client.wait(h.task_id)
        print(f"\n最終結果: {final.result}")

asyncio.run(main())

タイムアウト処理

async def with_timeout():
    async with AsyncShannonClient() as client:
        try:
            # タイムアウト付きで送信
            handle = await asyncio.wait_for(
                client.submit_task(
                    query=(
                        "段落からトップ3のインサイトを抽出し、Markdownの箇条書きで返してください。"
                    )
                ),
                timeout=30.0
            )

            # タイムアウト付きで完了を待機
            result = await asyncio.wait_for(
                client.wait(handle.task_id),
                timeout=300.0
            )

        except asyncio.TimeoutError:
            print("操作がタイムアウトしました")

バックグラウンドタスク

他の作業をしながらバックグラウンドでタスクを実行します:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # タスクを送信
        handle = await client.submit_task(query="四半期の収益ドライバーとリスクについての約500語のレポートを生成してください。出力: Markdown。")

        # バックグラウンドで待機を開始(ブロックしない)
        task = asyncio.create_task(client.wait(handle.task_id))

        # タスクが実行されている間に他の作業を行う
        print("バックグラウンドで処理中...")
        await asyncio.sleep(2)  # 他の非同期作業

        # ステータスを確認
        if not task.done():
            print("まだ処理中...")

        # 必要なときに結果を取得
        result = await task
        print(f"結果: {result.result}")

asyncio.run(main())

エラーハンドリング

import asyncio
from shannon import AsyncShannonClient, ConnectionError, TaskTimeoutError

async def main():
    async with AsyncShannonClient() as client:
        try:
            handle = await client.submit_task(query="AIとは何ですか?")
            result = await client.wait(handle.task_id)
            print(f"結果: {result.result}")

        except ConnectionError:
            print("Shannonへの接続に失敗しました")
        except TaskTimeoutError:
            print("タスクがタイムアウトしました")
        except Exception as e:
            print(f"予期しないエラー: {e}")

asyncio.run(main())

Webフレームワークとの統合

FastAPIの例

from contextlib import asynccontextmanager
from fastapi import FastAPI
from shannon import AsyncShannonClient

@asynccontextmanager
async def lifespan(app: FastAPI):
    # スタートアップ: クライアントを作成
    app.state.client = AsyncShannonClient()
    yield
    # シャットダウン: クライアントを閉じる
    await app.state.client.close()

app = FastAPI(lifespan=lifespan)

@app.post("/analyze")
async def analyze(query: str):
    client = app.state.client
    handle = await client.submit_task(query=query)
    result = await client.wait(handle.task_id)
    return {"result": result.result}
テストするには:
# FastAPIとuvicornをインストール
pip install fastapi uvicorn

# サーバーを実行
uvicorn your_file:app --reload

# エンドポイントをテスト
curl -X POST "http://127.0.0.1:8000/analyze?query=AIとは何ですか?"

ベストプラクティス

  1. コンテキストマネージャasync with)を使用して適切にクリーンアップする
  2. タイムアウトを処理して長時間実行される操作に対応する
  3. リトライロジックを実装してネットワーク障害に対処する
  4. gather()を使用して同時操作を行う
  5. イベントをストリームしてリアルタイム更新を行う

次のステップ