メインコンテンツへスキップ
エラーハンドリングのドキュメントが拡張されています。コアパターンは以下に示されています。

概要

Shannon Python SDKは、堅牢なアプリケーションを構築するための包括的なエラーハンドリングを提供します。すべてのSDK例外はShannonErrorから派生しています。

例外階層

ShannonError                    # 基本例外
├── ConnectionError             # ネットワーク/接続の問題(SDK特有)
├── AuthenticationError         # APIキー/認証の問題
├── PermissionDeniedError       # 禁止/認可の失敗
├── ValidationError             # 無効なパラメータ
├── RateLimitError              # リクエストが多すぎる/制限中
├── ServerError                 # 上流の5xxサーバーエラー
├── TaskNotFoundError           # タスクが存在しない
├── TaskTimeoutError            # タスクがタイムアウトを超えた
├── TaskCancelledError          # タスクがキャンセルされた
├── SessionNotFoundError        # セッションが存在しない
├── SessionExpiredError         # セッションが期限切れ
├── TemplateError               # テンプレート/ルートのエラー
└── TemplateNotFoundError       # テンプレートが見つからない
予算とタスクの失敗処理: 予算超過とタスクの失敗は例外ではありません。失敗を確認するにはstatus.statusをチェックしてください。トークン使用量とコストの合計を確認するには、list_tasks()を使用し、返されたタスクの要約からtotal_token_usageを読み取ります。

基本的なエラーハンドリング

Try-Catchパターン

from shannon import (
    ShannonClient,
    ShannonError,
    AuthenticationError,
    ConnectionError,
    TaskTimeoutError,
    TaskStatusEnum,
)

client = ShannonClient(base_url="http://localhost:8080")

try:
    handle = client.submit_task(query="このデータを分析する")
    status = client.wait(handle.task_id, timeout=120)

    if status.status == TaskStatusEnum.FAILED:
        print(f"タスクが失敗しました: {status.error_message}")
    else:
        print("結果:", status.result)

except ConnectionError:
    print("Shannonサーバーに接続できませんでした")
except AuthenticationError:
    print("無効なAPI認証情報")
except PermissionDeniedError:
    print("禁止: 権限が不足しています")
except RateLimitError:
    print("レート制限: スローダウンするか、バックオフを追加してください")
except ServerError:
    print("サーバーエラー(5xx): 後でもう一度試してください")
except TaskTimeoutError:
    print("タスクがタイムアウト制限を超えました")
except ShannonError as e:
    print(f"Shannonエラー: {e}")
except Exception as e:
    print(f"予期しないエラー: {e}")

特定のエラータイプ

接続エラー

ネットワークおよび接続の問題を処理します:
import time
from shannon import ShannonClient, ConnectionError

def connect_with_retry(max_retries=3):
    client = ShannonClient()

    for attempt in range(max_retries):
        try:
            # 簡単なタスクで接続をテスト
            handle = client.submit_task(query="ping")
            return client

        except ConnectionError as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 指数バックオフ
                print(f"接続に失敗しました。{wait_time}s後に再試行します...")
                time.sleep(wait_time)
            else:
                print(f"{max_retries}回の試行後に接続に失敗しました")
                raise

コストと失敗の確認

ステータスから失敗を監視し、list_tasks()を介して使用量の合計を取得します:
from shannon import ShannonClient, TaskStatusEnum

client = ShannonClient()
handle = client.submit_task(query="データを分析する")
status = client.wait(handle.task_id)

# 失敗の確認(TaskStatusは列挙型を返す)
if status.status == TaskStatusEnum.FAILED:
    print(f"エラーで失敗しました: {status.error_message}")

# 使用量とコストの合計(タスクリストから)
tasks, _ = client.list_tasks(limit=50)
summary = next((t for t in tasks if t.task_id == handle.task_id), None)
usage = summary.total_token_usage if summary else None
if usage:
    print(f"tokens={usage.total_tokens} prompt={usage.prompt_tokens} completion={usage.completion_tokens} cost=${usage.cost_usd:.6f}")

# 注意: TaskSummaryのstatusは文字列です
if summary and summary.status == "FAILED":
    print("要約は失敗を示しています")

タイムアウトエラー

長時間実行される操作を処理します:
import asyncio
from shannon import AsyncShannonClient, TaskTimeoutError

async def with_timeout_handling():
    async with AsyncShannonClient() as client:
        try:
            # asyncio.TimeoutErrorはwait_forラッパーから発生します(クライアント側)
            handle = await asyncio.wait_for(
                client.submit_task(query="複雑な分析"),
                timeout=10.0,
            )
            # TaskTimeoutErrorはタスクが自身のタイムアウトを超えた場合にShannonから発生します
            result = await asyncio.wait_for(
                client.wait(handle.task_id),
                timeout=60.0,
            )
            return result

        except asyncio.TimeoutError:
            print("操作がタイムアウトしました(クライアント側のasyncioタイムアウト)")
            return None
        except TaskTimeoutError:
            print("タスクがタイムアウトしました(Shannonが報告したタイムアウト)")
            return None

レート制限

APIのレート制限を適切に処理します:
from shannon import ShannonClient, ConnectionError
import time

def handle_rate_limits(queries):
    client = ShannonClient()
    results = []

    for query in queries:
        backoff = 1
        while True:
            try:
                handle = client.submit_task(query=query)
                result = client.wait(handle.task_id)
                results.append(result)
                break  # 成功、次へ進む
            except ConnectionError:
                # レート制限または一時的なネットワークエラー
                print(f"レート制限または一時的なエラー、{backoff}s後に再試行します...")
                time.sleep(backoff)
                backoff = min(backoff * 2, 30)

    return results
インポートの明確さ:from shannon import ConnectionErrorはSDKの例外を指します(Pythonの組み込みConnectionErrorではありません)。レート制限のバックオフやサーキットブレーカーなどの高度なパターンは参照実装です — 環境で検証してください。

バリデーションエラー

無効なパラメータを処理します:
from shannon import ShannonClient, ValidationError

def validate_and_submit(query, session_id=None):
    client = ShannonClient()

    try:
        return client.submit_task(query=query, session_id=session_id)
    except ValidationError as e:
        print(f"無効なパラメータ: {e}")
        return None

タスク失敗処理

タスク実行の失敗を処理します:
from shannon import ShannonClient, TaskTimeoutError, TaskStatusEnum

def handle_task_failure(query: str):
    client = ShannonClient()

    try:
        handle = client.submit_task(query=query)
        status = client.wait(handle.task_id, timeout=120)

        if status.status == TaskStatusEnum.FAILED:
            print(f"タスクが失敗しました: {status.error_message}")
            return None
        return status

    except TaskTimeoutError:
        print("タスクがタイムアウトしました; タイムアウトを増やすか、リクエストを簡素化することを検討してください。")
        return None

エラーロギング

包括的なエラーロギングを実装します:
import logging
from shannon import ShannonClient, ShannonError

# ロギングの設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('shannon')

def logged_task_submission(query):
    client = ShannonClient()

    try:
        logger.info(f"タスクを送信中: {query[:50]}...")
        handle = client.submit_task(query=query)

        logger.info(f"タスクが送信されました: task_id={handle.task_id}")
        result = client.wait(handle.task_id)

        logger.info("タスクが正常に完了しました")
        return result.result

    except ShannonError as e:
        logger.error(f"Shannonエラー: {e}", exc_info=True)
        raise

    except Exception as e:
        logger.critical(f"予期しないエラー: {e}", exc_info=True)
        raise

サーキットブレーカーパターン

レジリエンスのためにサーキットブレーカーを実装します:
class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "half-open"
            else:
                raise Exception("サーキットブレーカーはオープンです")

        try:
            result = func(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0
            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "open"

            raise

# 使用例
breaker = CircuitBreaker()
client = ShannonClient()

try:
    result = breaker.call(
        client.submit_task,
        query="データを分析する"
    )
except Exception as e:
    print(f"サービスが利用できません: {e}")

ベストプラクティス

  1. 特定の例外を常にキャッチし、一般的なものの前にする
  2. 指数バックオフを伴う再試行ロジックを実装する
  3. エラーをログに記録してデバッグと監視を行う
  4. 重要な操作のためにフォールバックオプションを提供する
  5. ハングを避けるために合理的なタイムアウトを設定する
  6. 送信前に入力を検証する
  7. 外部依存関係に対してサーキットブレーカーを使用する

次のステップ

完全な再試行例

#!/usr/bin/env python3
"""再試行ロジックを伴うエラーハンドリングの例"""

import time
from shannon import ShannonClient, ConnectionError, TaskTimeoutError, ShannonError

def robust_task_submission(query: str, max_retries: int = 3):
    """
    再試行ロジックと包括的なエラーハンドリングを伴うタスクを送信します。

    Args:
        query: タスクのクエリ
        max_retries: 回復可能なエラーの最大再試行回数

    Returns:
        TaskStatusオブジェクトまたはすべての試行が失敗した場合はNone
    """
    client = ShannonClient()

    for attempt in range(max_retries):
        try:
            print(f"\n[試行 {attempt + 1}/{max_retries}] タスクを送信中...")
            handle = client.submit_task(query=query)
            print(f"✅ タスクが送信されました: {handle.task_id}")
            print("⏳ 結果を待っています(300秒のタイムアウト)...")
            result = client.wait(handle.task_id, timeout=300)
            print("✅ タスクが正常に完了しました")
            return result

        except ConnectionError as e:
            wait_time = 2 ** attempt  # 1s, 2s, 4s
            print(f"❌ 接続エラー: {e}")
            if attempt < max_retries - 1:
                print(f"⏳ {wait_time}秒後に再試行します...")
                time.sleep(wait_time)
            else:
                print(f"❌ 最大再試行回数({max_retries})に達しました。あきらめます。")
                raise

        except TaskTimeoutError as e:
            print(f"❌ タスクタイムアウトエラー: {e}")
            print("⚠️  タスクが300秒の制限を超えました。再試行しません。")
            raise

        except ShannonError as e:
            print(f"❌ Shannon APIエラー: {e}")
            print("⚠️  APIエラーが発生しました。再試行しません。")
            raise

        except Exception as e:
            print(f"❌ 予期しないエラー: {type(e).__name__}: {e}")
            raise

    return None


def main():
    print("=" * 60)
    print("エラーハンドリングの例")
    print("=" * 60)

    # 例1: 通常の実行
    print("\n📝 例1: 通常の実行")
    print("-" * 60)
    try:
        result = robust_task_submission("2+2は何ですか?")
        if result:
            print(f"\n最終結果: {result.result}")
    except Exception as e:
        print(f"\n⚠️  失敗: {e}")

    # 例2: タイムアウトシナリオ(早期失敗)
    print("\n\n📝 例2: タイムアウトシナリオ")
    print("-" * 60)
    print("注: これはタイムアウト処理を示しています")
    try:
        result = robust_task_submission(
            "この複雑なデータセットを分析します...(長いタスクをシミュレート)"
        )
        if result:
            print(f"\n最終結果: {result.result}")
    except TaskTimeoutError:
        print("\n⚠️  タスクタイムアウト - 再試行は行われません(期待される動作)")
    except Exception as e:
        print(f"\n⚠️  エラー: {e}")

    # 例3: シンプルなtry/catchパターン
    print("\n\n📝 例3: シンプルなtry-catchパターン")
    print("-" * 60)
    client = ShannonClient()
    try:
        handle = client.submit_task(query="フランスの首都は何ですか?")
        result = client.wait(handle.task_id)
        print(f"✅ 成功: {result.result}")
    except ConnectionError:
        print("❌ ネットワークの問題 - Shannonサービスを確認してください")
    except TaskTimeoutError:
        print("❌ タスクが長すぎました")
    except ShannonError as e:
        print(f"❌ APIエラー: {e}")
    except Exception as e:
        print(f"❌ 予期しないエラー: {e}")

    print("\n" + "=" * 60)
    print("✅ エラーハンドリングの例が完了しました!")
    print("=" * 60)


if __name__ == "__main__":
    main()
実行するには:
# Shannonが実行中であることを確認
make dev

# 例を実行
python3 error_handling.py
再試行ロジックを使用するタイミング:
  • ネットワークの不安定さ
  • 一時的なサービスの問題
  • レジリエンスが必要なプロダクションシステム
再試行しないべきタイミング:
  • タイムアウト(タスクが複雑すぎる)
  • APIエラー(無効なパラメータ)
  • 認証失敗