メインコンテンツへスキップ
この例のコレクションは増加しています。今後も定期的に使用例が追加されます。

基本的な例

モデル選択

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
  "収益トレンドに焦点を当てて段落を3つの箇条書きに要約してください。出力形式: Markdownリスト。",
  model_tier="small",
  # model_override="gpt-5-nano-2025-08-07",
  # provider_override="openai",
  mode="simple",
)
print(client.wait(handle.task_id).result)

簡単な質問応答

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 簡単な質問をする
handle = client.submit_task(query="フランスの首都はどこですか?")
final = client.wait(handle.task_id)
print(f"回答: {final.result}")

データ分析

from shannon import ShannonClient

client = ShannonClient()

# 現実的なデータ分析
handle = client.submit_task(
    query=(
        "以下の売上データに基づいて計算してください:\n"
        "1. 製品ごとの総収益\n"
        "2. 月ごとの成長率\n"
        "3. 総収益が最も高い上位3つを返す\n\n"
        "データ:\n"
        "ProductA: 1月=$10000, 2月=$12000\n"
        "ProductB: 1月=$8000, 2月=$9500\n"
        "ProductC: 1月=$15000, 2月=$14000\n"
        "ProductD: 1月=$5000, 2月=$6000\n\n"
        "フォーマット: JSON配列 [{product, revenue, mom_growth_pct}]"
    )
)

result = client.wait(handle.task_id)
print(result.result)

高度な例

マルチステップワークフロー

"""Shannon SDKを使用したマルチステップワークフローの例"""
from shannon import ShannonClient

def main():
    client = ShannonClient()
    session_id = "quarterly-analysis-demo"

    print("=" * 60)
    print("マルチステップワークフローの例")
    print("=" * 60)

    # ステップ1: 初期データクエリ
    print("\n[ステップ 1] Q4データを読み込み中...")
    h1 = client.submit_task(
        query="1000 + 500 はいくつですか?これはQ4の収益(1000)と費用(500)を表します。",
        session_id=session_id
    )
    result1 = client.wait(h1.task_id)
    print(f"結果: {result1.result}")

    # ステップ2: 前のコンテキストに基づく分析
    print("\n[ステップ 2] トレンドを分析中...")
    h2 = client.submit_task(
        query="あなたが計算したQ4の数字に基づいて、利益率は何パーセントですか?",
        session_id=session_id
    )
    result2 = client.wait(h2.task_id)
    print(f"結果: {result2.result}")

    # ステップ3: すべての前のコンテキストを使用した要約
    print("\n[ステップ 3] エグゼクティブサマリーを作成中...")
    h3 = client.submit_task(
        query="Q4の財務分析を2文で要約してください。",
        session_id=session_id
    )
    result3 = client.wait(h3.task_id)
    print(f"結果: {result3.result}")

    print("\n" + "=" * 60)
    print("✅ マルチステップワークフローが完了しました!")
    print(f"セッションID: {session_id}")
    print("=" * 60)

if __name__ == "__main__":
    main()

並列処理

"""Shannon SDKを使用した並列処理の例"""
import asyncio
from shannon import AsyncShannonClient

async def main():
    print("=" * 60)
    print("並列処理の例")
    print("=" * 60)

    async with AsyncShannonClient() as client:
        # 並列処理するトピック
        topics = ["AI", "量子コンピューティング", "バイオテクノロジー"]

        print(f"\n[ステップ 1] {len(topics)} タスクを並列で提出中...")

        # すべてのタスクを同時に提出(ノンブロッキング)
        tasks = [
            client.submit_task(
                query=f"{topic}の1文要約とその主な応用を教えてください。"
            )
            for topic in topics
        ]

        # すべての提出が完了するのを待つ
        handles = await asyncio.gather(*tasks)
        print(f"✅ {len(handles)} タスクがすべて提出されました")

        print("\n[ステップ 2] すべての結果を待機中...")

        # すべてのタスクが並列で完了するのを待つ
        results = await asyncio.gather(
            *[client.wait(h.task_id) for h in handles]
        )
        print(f"✅ {len(results)} タスクがすべて完了しました")

        print("\n[ステップ 3] 結果:")
        print("-" * 60)

        # 結果を表示
        for topic, result in zip(topics, results):
            print(f"\n📌 {topic}:")
            print(f"   {result.result}")

            # メタデータが利用可能な場合は安全にアクセス
            metadata = []
            if hasattr(result, 'metadata') and result.metadata:
                model = result.metadata.get('model_used') or result.metadata.get('model')
                if model:
                    metadata.append(f"モデル: {model}")
                tokens = result.metadata.get('total_tokens')
                if tokens:
                    metadata.append(f"トークン: {tokens}")

            if metadata:
                print(f"   ({', '.join(metadata)})")

        print("\n" + "=" * 60)
        print("✅ 並列処理が完了しました!")
        print(f"{len(topics)} トピックを同時に処理しました")
        print("=" * 60)

if __name__ == "__main__":
    asyncio.run(main())

フィルターを使用したストリーミング

"""フィルター付きストリーミングの例"""
from shannon import ShannonClient, EventType

def main():
    print("=" * 60)
    print("フィルターを使用したストリーミングの例")
    print("=" * 60)

    client = ShannonClient()

    print("\n[ステップ 1] タスクを送信中...")
    handle = client.submit_task(
        query=(
            "AWS、Azure、GCPの価格モデルを3文で比較してください。 "
            "生成される出力をストリームします。"
        )
    )
    print(f"✅ タスクが送信されました: {handle.task_id}")

    print("\n[ステップ 2] フィルターを使用したストリーミング [LLM_OUTPUT, WORKFLOW_COMPLETED]...")
    print("-" * 60)

    event_count = 0
    llm_outputs = []

    # LLM_OUTPUT と WORKFLOW_COMPLETED イベントのみをストリーム
    for event in client.stream(
        handle.workflow_id,
        types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
    ):
        event_count += 1
        print(f"[{event_count}] {event.type}")

        if event.type == EventType.LLM_OUTPUT:
            print(f"    コンテンツ: {event.message[:80]}...")
            llm_outputs.append(event.message)

        if event.type == EventType.WORKFLOW_COMPLETED:
            print(f"    ステータス: {event.message}")
            break

    print("\n" + "-" * 60)
    print(f"✅ {event_count} 件のフィルタリングされたイベントを受信")
    print(f"✅ {len(llm_outputs)} 件の LLM 出力をキャプチャ")

    print("\n[ステップ 3] 完全な結果:")
    print("-" * 60)
    final = client.wait(handle.task_id)
    print(final.result)

    print("\n" + "=" * 60)
    print("✅ フィルターを使用したストリーミングが完了しました!")
    print("=" * 60)

if __name__ == "__main__":
    main()
イベントタイプの完全なリストは、イベントタイプカタログを参照してください。
利用可能なイベントタイプ
  • LLM_OUTPUT — LLM の応答
  • LLM_PARTIAL — ストリーミングトークン
  • WORKFLOW_COMPLETED — タスク完了
  • AGENT_THINKING — エージェントの推論
  • AGENT_STARTED — エージェントが作業を開始
  • AGENT_COMPLETED — エージェントが完了
  • PROGRESS — 進捗更新
  • ERROR_OCCURRED — エラー
一般的なフィルター
  • 最終出力のみ: types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
  • リアルタイムストリーミング: types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED]
  • エージェントの監視: types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED]
  • すべて (フィルターなし): types=None (またはパラメータを省略)

タスク制御の例

長時間実行されるタスクの一時停止と再開

"""タスク制御 (一時停止/再開) の例"""
from shannon import ShannonClient
import time

def main():
    print("=" * 60)
    print("タスク制御 (一時停止/再開) の例")
    print("=" * 60)

    client = ShannonClient(base_url="http://localhost:8080")

    # 長時間実行される研究タスクを送信
    print("\n[ステップ 1] 長時間実行されるタスクを送信中...")
    handle = client.submit_task(
        query="量子コンピューティングとAI統合の最新の進展を調査し、詳細な分析を提供してください。"
    )
    print(f"✅ タスクが送信されました: {handle.task_id}")

    # 少し実行させる
    print("\n[ステップ 2] タスクを5秒間実行させる...")
    time.sleep(5)

    # タスクを一時停止
    print("\n[ステップ 3] タスクを一時停止中...")
    success = client.pause_task(handle.task_id, reason="中間結果のレビュー")
    if success:
        print("✅ 一時停止信号が送信されました")

    # 制御状態を確認
    print("\n[ステップ 4] 制御状態を確認中...")
    state = client.get_control_state(handle.task_id)
    print(f"一時停止中: {state.is_paused}")
    print(f"キャンセル済み: {state.is_cancelled}")
    if state.paused_at:
        print(f"一時停止時刻: {state.paused_at}")
        print(f"一時停止理由: {state.pause_reason}")
        print(f"一時停止した人: {state.paused_by}")

    # 再開前に少し待つ
    print("\n[ステップ 5] 再開前に3秒待機中...")
    time.sleep(3)

    # タスクを再開
    print("\n[ステップ 6] タスクを再開中...")
    success = client.resume_task(handle.task_id, reason="レビュー後に続行")
    if success:
        print("✅ 再開信号が送信されました")

    # 完了を待つ
    print("\n[ステップ 7] タスクの完了を待機中...")
    result = client.wait(handle.task_id)
    print(f"\n✅ タスクが完了しました!")
    print(f"ステータス: {result.status}")
    print(f"\n結果プレビュー: {result.result[:200]}...")

    print("\n" + "=" * 60)
    print("✅ タスク制御の例が完了しました!")
    print("=" * 60)

if __name__ == "__main__":
    main()

制御信号のエラーハンドリング

"""一時停止/再開操作のエラーハンドリング"""
from shannon import ShannonClient
from shannon.errors import APIError

def main():
    client = ShannonClient()

    task_id = "task-123"

    # 一時停止を試みる
    try:
        client.pause_task(task_id, reason="レビューが必要")
        print("✅ タスクが正常に一時停止されました")
    except APIError as e:
        # 特定のエラーケースを処理
        if "完了したタスクを一時停止できません" in str(e):
            print("⚠️  タスクはすでに完了しているため、一時停止できません")
        elif "タスクはすでに一時停止しています" in str(e):
            print("⚠️  タスクはすでに一時停止しています")
        elif "タスクが見つかりません" in str(e):
            print("❌ タスクが見つかりません")
        else:
            print(f"❌ エラー: {e}")

    # 再開を試みる
    try:
        client.resume_task(task_id, reason="実行を続ける")
        print("✅ タスクが正常に再開されました")
    except APIError as e:
        if "タスクは一時停止されていません" in str(e):
            print("⚠️  タスクは一時停止されていないため、再開できません")
        elif "完了したタスクを再開できません" in str(e):
            print("⚠️  タスクはすでに完了しているため、再開できません")
        else:
            print(f"❌ エラー: {e}")

if __name__ == "__main__":
    main()

エラーハンドリングの例

完全なパターンとフルリトライロジックの例については、専用ページをご覧ください: Error Handling

次のステップ