メインコンテンツへスキップ
この例のコレクションは増加しています。今後も定期的に使用例が追加されます。

基本的な例

モデル選択

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
  "収益トレンドに焦点を当てて段落を3つの箇条書きに要約してください。出力形式: Markdownリスト。",
  model_tier="small",
  # model_override="gpt-5-nano-2025-08-07",
  # provider_override="openai",
  mode="simple",
)
print(client.wait(handle.task_id).result)

簡単な質問応答

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 簡単な質問をする
handle = client.submit_task(query="フランスの首都はどこですか?")
final = client.wait(handle.task_id)
print(f"回答: {final.result}")

データ分析

from shannon import ShannonClient

client = ShannonClient()

# 現実的なデータ分析
handle = client.submit_task(
    query=(
        "以下の売上データに基づいて計算してください:\n"
        "1. 製品ごとの総収益\n"
        "2. 月ごとの成長率\n"
        "3. 総収益が最も高い上位3つを返す\n\n"
        "データ:\n"
        "ProductA: 1月=$10000, 2月=$12000\n"
        "ProductB: 1月=$8000, 2月=$9500\n"
        "ProductC: 1月=$15000, 2月=$14000\n"
        "ProductD: 1月=$5000, 2月=$6000\n\n"
        "フォーマット: JSON配列 [{product, revenue, mom_growth_pct}]"
    )
)

result = client.wait(handle.task_id)
print(result.result)

高度な例

マルチステップワークフロー

"""Shannon SDKを使用したマルチステップワークフローの例"""
from shannon import ShannonClient

def main():
    client = ShannonClient()
    session_id = "quarterly-analysis-demo"

    print("=" * 60)
    print("マルチステップワークフローの例")
    print("=" * 60)

    # ステップ1: 初期データクエリ
    print("\n[ステップ 1] Q4データを読み込み中...")
    h1 = client.submit_task(
        query="1000 + 500 はいくつですか?これはQ4の収益(1000)と費用(500)を表します。",
        session_id=session_id
    )
    result1 = client.wait(h1.task_id)
    print(f"結果: {result1.result}")

    # ステップ2: 前のコンテキストに基づく分析
    print("\n[ステップ 2] トレンドを分析中...")
    h2 = client.submit_task(
        query="あなたが計算したQ4の数字に基づいて、利益率は何パーセントですか?",
        session_id=session_id
    )
    result2 = client.wait(h2.task_id)
    print(f"結果: {result2.result}")

    # ステップ3: すべての前のコンテキストを使用した要約
    print("\n[ステップ 3] エグゼクティブサマリーを作成中...")
    h3 = client.submit_task(
        query="Q4の財務分析を2文で要約してください。",
        session_id=session_id
    )
    result3 = client.wait(h3.task_id)
    print(f"結果: {result3.result}")

    print("\n" + "=" * 60)
    print("✅ マルチステップワークフローが完了しました!")
    print(f"セッションID: {session_id}")
    print("=" * 60)

if __name__ == "__main__":
    main()

並列処理

"""Shannon SDKを使用した並列処理の例"""
import asyncio
from shannon import AsyncShannonClient

async def main():
    print("=" * 60)
    print("並列処理の例")
    print("=" * 60)

    async with AsyncShannonClient() as client:
        # 並列処理するトピック
        topics = ["AI", "量子コンピューティング", "バイオテクノロジー"]

        print(f"\n[ステップ 1] {len(topics)} タスクを並列で提出中...")

        # すべてのタスクを同時に提出(ノンブロッキング)
        tasks = [
            client.submit_task(
                query=f"{topic}の1文要約とその主な応用を教えてください。"
            )
            for topic in topics
        ]

        # すべての提出が完了するのを待つ
        handles = await asyncio.gather(*tasks)
        print(f"✅ {len(handles)} タスクがすべて提出されました")

        print("\n[ステップ 2] すべての結果を待機中...")

        # すべてのタスクが並列で完了するのを待つ
        results = await asyncio.gather(
            *[client.wait(h.task_id) for h in handles]
        )
        print(f"✅ {len(results)} タスクがすべて完了しました")

        print("\n[ステップ 3] 結果:")
        print("-" * 60)

        # 結果を表示
        for topic, result in zip(topics, results):
            print(f"\n📌 {topic}:")
            print(f"   {result.result}")

            # メタデータが利用可能な場合は安全にアクセス
            metadata = []
            if hasattr(result, 'metadata') and result.metadata:
                model = result.metadata.get('model_used') or result.metadata.get('model')
                if model:
                    metadata.append(f"モデル: {model}")
                tokens = result.metadata.get('total_tokens')
                if tokens:
                    metadata.append(f"トークン: {tokens}")

            if metadata:
                print(f"   ({', '.join(metadata)})")

        print("\n" + "=" * 60)
        print("✅ 並列処理が完了しました!")
        print(f"{len(topics)} トピックを同時に処理しました")
        print("=" * 60)

if __name__ == "__main__":
    asyncio.run(main())

フィルターを使用したストリーミング

"""フィルター付きストリーミングの例"""
from shannon import ShannonClient, EventType

def main():
    print("=" * 60)
    print("フィルターを使用したストリーミングの例")
    print("=" * 60)

    client = ShannonClient()

    print("\n[ステップ 1] タスクを送信中...")
    handle = client.submit_task(
        query=(
            "AWS、Azure、GCPの価格モデルを3文で比較してください。 "
            "生成される出力をストリームします。"
        )
    )
    print(f"✅ タスクが送信されました: {handle.task_id}")

    print("\n[ステップ 2] フィルターを使用したストリーミング [LLM_OUTPUT, WORKFLOW_COMPLETED]...")
    print("-" * 60)

    event_count = 0
    llm_outputs = []

    # LLM_OUTPUT と WORKFLOW_COMPLETED イベントのみをストリーム
    for event in client.stream(
        handle.workflow_id,
        types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
    ):
        event_count += 1
        print(f"[{event_count}] {event.type}")

        if event.type == EventType.LLM_OUTPUT:
            print(f"    コンテンツ: {event.message[:80]}...")
            llm_outputs.append(event.message)

        if event.type == EventType.WORKFLOW_COMPLETED:
            print(f"    ステータス: {event.message}")
            break

    print("\n" + "-" * 60)
    print(f"✅ {event_count} 件のフィルタリングされたイベントを受信")
    print(f"✅ {len(llm_outputs)} 件の LLM 出力をキャプチャ")

    print("\n[ステップ 3] 完全な結果:")
    print("-" * 60)
    final = client.wait(handle.task_id)
    print(final.result)

    print("\n" + "=" * 60)
    print("✅ フィルターを使用したストリーミングが完了しました!")
    print("=" * 60)

if __name__ == "__main__":
    main()
イベントタイプの完全なリストは、イベントタイプカタログを参照してください。
利用可能なイベントタイプ
  • LLM_OUTPUT — LLM の応答
  • LLM_PARTIAL — ストリーミングトークン
  • WORKFLOW_COMPLETED — タスク完了
  • AGENT_THINKING — エージェントの推論
  • AGENT_STARTED — エージェントが作業を開始
  • AGENT_COMPLETED — エージェントが完了
  • PROGRESS — 進捗更新
  • ERROR_OCCURRED — エラー
一般的なフィルター
  • 最終出力のみ: types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
  • リアルタイムストリーミング: types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED]
  • エージェントの監視: types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED]
  • すべて (フィルターなし): types=None (またはパラメータを省略)

タスク制御の例

長時間実行されるタスクの一時停止と再開

"""タスク制御 (一時停止/再開) の例"""
from shannon import ShannonClient
import time

def main():
    print("=" * 60)
    print("タスク制御 (一時停止/再開) の例")
    print("=" * 60)

    client = ShannonClient(base_url="http://localhost:8080")

    # 長時間実行される研究タスクを送信
    print("\n[ステップ 1] 長時間実行されるタスクを送信中...")
    handle = client.submit_task(
        query="量子コンピューティングとAI統合の最新の進展を調査し、詳細な分析を提供してください。"
    )
    print(f"✅ タスクが送信されました: {handle.task_id}")

    # 少し実行させる
    print("\n[ステップ 2] タスクを5秒間実行させる...")
    time.sleep(5)

    # タスクを一時停止
    print("\n[ステップ 3] タスクを一時停止中...")
    success = client.pause_task(handle.task_id, reason="中間結果のレビュー")
    if success:
        print("✅ 一時停止信号が送信されました")

    # 制御状態を確認
    print("\n[ステップ 4] 制御状態を確認中...")
    state = client.get_control_state(handle.task_id)
    print(f"一時停止中: {state.is_paused}")
    print(f"キャンセル済み: {state.is_cancelled}")
    if state.paused_at:
        print(f"一時停止時刻: {state.paused_at}")
        print(f"一時停止理由: {state.pause_reason}")
        print(f"一時停止した人: {state.paused_by}")

    # 再開前に少し待つ
    print("\n[ステップ 5] 再開前に3秒待機中...")
    time.sleep(3)

    # タスクを再開
    print("\n[ステップ 6] タスクを再開中...")
    success = client.resume_task(handle.task_id, reason="レビュー後に続行")
    if success:
        print("✅ 再開信号が送信されました")

    # 完了を待つ
    print("\n[ステップ 7] タスクの完了を待機中...")
    result = client.wait(handle.task_id)
    print(f"\n✅ タスクが完了しました!")
    print(f"ステータス: {result.status}")
    print(f"\n結果プレビュー: {result.result[:200]}...")

    print("\n" + "=" * 60)
    print("✅ タスク制御の例が完了しました!")
    print("=" * 60)

if __name__ == "__main__":
    main()

制御信号のエラーハンドリング

"""一時停止/再開操作のエラーハンドリング"""
from shannon import ShannonClient
from shannon.errors import ShannonError

def main():
    client = ShannonClient()

    task_id = "task-123"

    # 一時停止を試みる
    try:
        client.pause_task(task_id, reason="レビューが必要")
        print("✅ タスクが正常に一時停止されました")
    except ShannonError as e:
        # 特定のエラーケースを処理
        if "完了したタスクを一時停止できません" in str(e):
            print("⚠️  タスクはすでに完了しているため、一時停止できません")
        elif "タスクはすでに一時停止しています" in str(e):
            print("⚠️  タスクはすでに一時停止しています")
        elif "タスクが見つかりません" in str(e):
            print("❌ タスクが見つかりません")
        else:
            print(f"❌ エラー: {e}")

    # 再開を試みる
    try:
        client.resume_task(task_id, reason="実行を続ける")
        print("✅ タスクが正常に再開されました")
    except ShannonError as e:
        if "タスクは一時停止されていません" in str(e):
            print("⚠️  タスクは一時停止されていないため、再開できません")
        elif "完了したタスクを再開できません" in str(e):
            print("⚠️  タスクはすでに完了しているため、再開できません")
        else:
            print(f"❌ エラー: {e}")

if __name__ == "__main__":
    main()

スウォームモード

マルチエージェント実行の強制

"""スウォームモードの例(Shannon SDK)"""
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# スウォームモードで送信し、複数のエージェントを連携させる
handle = client.submit_task(
    query="MLワークロード向けのAWS、Azure、GCPを調査・比較してください。料金、GPU可用性、マネージドサービスをカバーしてください。",
    force_swarm=True,
)

result = client.wait(handle.task_id)
print(result.result)

# モデルと使用量情報を確認
if result.usage:
    print(f"合計トークン: {result.usage.get('total_tokens')}")
    print(f"コスト: ${result.usage.get('cost_usd', 0):.6f}")

ヒューマン・イン・ザ・ループ レビューの例

インタラクティブなレビューワークフロー

"""ヒューマン・イン・ザ・ループ レビューの例"""
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# レビューが必要なワークフローがすでに実行中であると仮定
workflow_id = "workflow-123"

# 現在のレビュー状態を確認
state = client.get_review_state(workflow_id)
print(f"レビューステータス: {state.status}")
print(f"ラウンド: {state.round}, バージョン: {state.version}")

if state.current_plan:
    print(f"現在のプラン:\n{state.current_plan}")

# 会話履歴を表示
for r in state.rounds:
    print(f"  [{r.role}] {r.message}")

# まだレビュー中の場合、フィードバックを提供
if state.status == "reviewing":
    updated = client.submit_review_feedback(
        workflow_id,
        "入力バリデーションとユニットテストを追加してください。",
        version=state.version,
    )
    print(f"フィードバックが送信されました。現在のラウンド: {updated.round}")

    # 更新されたプランを承認
    result = client.approve_review(workflow_id, version=updated.version)
    print(f"承認済み: {result['status']}")

楽観的同時実行制御を使用したレビュー

"""レビュー中のバージョン競合の処理"""
from shannon import ShannonClient, ValidationError

client = ShannonClient()

workflow_id = "workflow-456"

try:
    state = client.get_review_state(workflow_id)
    updated = client.submit_review_feedback(
        workflow_id,
        "全体的に良いですが、些細なスタイルの変更が必要です。",
        version=state.version,
    )
    client.approve_review(workflow_id, version=updated.version)
except ValidationError as e:
    if "409" in str(e.code):
        print("バージョン競合: 別のユーザーがレビューを変更しました。更新して再試行してください。")
    else:
        raise

スキルの例

スキルの閲覧と確認

"""スキル閲覧の例"""
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 利用可能なすべてのスキルを一覧表示
skills = client.list_skills()
print(f"{len(skills)} 個のスキルが見つかりました:\n")
for s in skills:
    flag = " [危険]" if s.dangerous else ""
    status = "有効" if s.enabled else "無効"
    print(f"  {s.name} v{s.version} ({s.category}) - {status}{flag}")
    print(f"    {s.description}")

# カテゴリでフィルタリング
print("\n--- コーディングスキル ---")
coding = client.list_skills(category="coding")
for s in coding:
    print(f"  {s.name}: {s.description}")

# 詳細情報を取得
detail = client.get_skill("web_search")
print(f"\nスキル詳細: {detail.name} v{detail.version}")
print(f"  著者: {detail.author}")
print(f"  必要なツール: {detail.requires_tools}")
print(f"  必要なロール: {detail.requires_role}")
print(f"  最大バジェット: {detail.budget_max}")

# バージョン履歴を取得
versions = client.get_skill_versions("web_search")
print(f"\nweb_search のバージョン:")
for v in versions:
    print(f"  v{v.version} - {v.description}")

スケジュール管理の例

スケジュールタスクの作成と管理

"""スケジュール管理の例"""
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# 平日の毎朝9時(UTC)に実行される調査タスクを作成
result = client.create_schedule(
    name="デイリーAIニュース",
    cron_expression="0 9 * * 1-5",
    task_query="最新のAI研究ニュースを要約してください",
    task_context={"force_research": "true", "research_strategy": "quick"},
    timezone="UTC",
    max_budget_per_run_usd=0.50,
)
schedule_id = result["schedule_id"]
print(f"スケジュールが作成されました: {schedule_id}")

# すべてのスケジュールを一覧表示
schedules, total = client.list_schedules()
print(f"\n{total} 個のスケジュール:")
for s in schedules:
    print(f"  {s.name}: {s.cron_expression} ({s.status})")
    print(f"    実行回数: 合計 {s.total_runs}, 成功 {s.successful_runs}, 失敗 {s.failed_runs}")

# スケジュールの詳細を取得
sched = client.get_schedule(schedule_id)
print(f"\nスケジュール: {sched.name}")
print(f"  次回実行: {sched.next_run_at}")
print(f"  前回実行: {sched.last_run_at}")

# 実行履歴を表示
runs, _ = client.get_schedule_runs(schedule_id)
for run in runs:
    print(f"  {run.triggered_at}: {run.status} - ${run.total_cost_usd:.4f}")

# スケジュールを一時停止(例: メンテナンス中)
client.pause_schedule(schedule_id, reason="メンテナンスウィンドウ")

# スケジュールを再開
client.resume_schedule(schedule_id)

# スケジュール設定を更新
client.update_schedule(
    schedule_id,
    cron_expression="0 8 * * 1-5",  # 8時に変更
    max_budget_per_run_usd=0.75,
)

# 不要になったら削除
# client.delete_schedule(schedule_id)

エラーハンドリングの例

完全なパターンとフルリトライロジックの例については、専用ページをご覧ください: エラーハンドリング

次のステップ