エラーハンドリングのドキュメントが拡張されています。コアパターンは以下に示されています。
概要
Shannon Python SDKは、堅牢なアプリケーションを構築するための包括的なエラーハンドリングを提供します。すべてのSDK例外はShannonErrorから派生しています。
例外階層
コピー
ShannonError # 基本例外
├── ConnectionError # ネットワーク/接続の問題(SDK特有)
├── AuthenticationError # APIキー/認証の問題
├── PermissionDeniedError # 禁止/認可の失敗
├── ValidationError # 無効なパラメータ
├── RateLimitError # リクエストが多すぎる/制限中
├── ServerError # 上流の5xxサーバーエラー
├── TaskNotFoundError # タスクが存在しない
├── TaskTimeoutError # タスクがタイムアウトを超えた
├── TaskCancelledError # タスクがキャンセルされた
├── SessionNotFoundError # セッションが存在しない
├── SessionExpiredError # セッションが期限切れ
├── TemplateError # テンプレート/ルートのエラー
└── TemplateNotFoundError # テンプレートが見つからない
予算とタスクの失敗処理: 予算超過とタスクの失敗は例外ではありません。失敗を確認するには
status.statusをチェックしてください。トークン使用量とコストの合計を確認するには、list_tasks()を使用し、返されたタスクの要約からtotal_token_usageを読み取ります。基本的なエラーハンドリング
Try-Catchパターン
コピー
from shannon import (
ShannonClient,
ShannonError,
AuthenticationError,
ConnectionError,
TaskTimeoutError,
TaskStatusEnum,
)
client = ShannonClient(base_url="http://localhost:8080")
try:
handle = client.submit_task(query="このデータを分析する")
status = client.wait(handle.task_id, timeout=120)
if status.status == TaskStatusEnum.FAILED:
print(f"タスクが失敗しました: {status.error_message}")
else:
print("結果:", status.result)
except ConnectionError:
print("Shannonサーバーに接続できませんでした")
except AuthenticationError:
print("無効なAPI認証情報")
except PermissionDeniedError:
print("禁止: 権限が不足しています")
except RateLimitError:
print("レート制限: スローダウンするか、バックオフを追加してください")
except ServerError:
print("サーバーエラー(5xx): 後でもう一度試してください")
except TaskTimeoutError:
print("タスクがタイムアウト制限を超えました")
except ShannonError as e:
print(f"Shannonエラー: {e}")
except Exception as e:
print(f"予期しないエラー: {e}")
特定のエラータイプ
接続エラー
ネットワークおよび接続の問題を処理します:コピー
import time
from shannon import ShannonClient, ConnectionError
def connect_with_retry(max_retries=3):
client = ShannonClient()
for attempt in range(max_retries):
try:
# 簡単なタスクで接続をテスト
handle = client.submit_task(query="ping")
return client
except ConnectionError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数バックオフ
print(f"接続に失敗しました。{wait_time}s後に再試行します...")
time.sleep(wait_time)
else:
print(f"{max_retries}回の試行後に接続に失敗しました")
raise
コストと失敗の確認
ステータスから失敗を監視し、list_tasks()を介して使用量の合計を取得します:
コピー
from shannon import ShannonClient, TaskStatusEnum
client = ShannonClient()
handle = client.submit_task(query="データを分析する")
status = client.wait(handle.task_id)
# 失敗の確認(TaskStatusは列挙型を返す)
if status.status == TaskStatusEnum.FAILED:
print(f"エラーで失敗しました: {status.error_message}")
# 使用量とコストの合計(タスクリストから)
tasks, _ = client.list_tasks(limit=50)
summary = next((t for t in tasks if t.task_id == handle.task_id), None)
usage = summary.total_token_usage if summary else None
if usage:
print(f"tokens={usage.total_tokens} prompt={usage.prompt_tokens} completion={usage.completion_tokens} cost=${usage.cost_usd:.6f}")
# 注意: TaskSummaryのstatusは文字列です
if summary and summary.status == "FAILED":
print("要約は失敗を示しています")
タイムアウトエラー
長時間実行される操作を処理します:コピー
import asyncio
from shannon import AsyncShannonClient, TaskTimeoutError
async def with_timeout_handling():
async with AsyncShannonClient() as client:
try:
# asyncio.TimeoutErrorはwait_forラッパーから発生します(クライアント側)
handle = await asyncio.wait_for(
client.submit_task(query="複雑な分析"),
timeout=10.0,
)
# TaskTimeoutErrorはタスクが自身のタイムアウトを超えた場合にShannonから発生します
result = await asyncio.wait_for(
client.wait(handle.task_id),
timeout=60.0,
)
return result
except asyncio.TimeoutError:
print("操作がタイムアウトしました(クライアント側のasyncioタイムアウト)")
return None
except TaskTimeoutError:
print("タスクがタイムアウトしました(Shannonが報告したタイムアウト)")
return None
レート制限
APIのレート制限を適切に処理します:コピー
from shannon import ShannonClient, ConnectionError
import time
def handle_rate_limits(queries):
client = ShannonClient()
results = []
for query in queries:
backoff = 1
while True:
try:
handle = client.submit_task(query=query)
result = client.wait(handle.task_id)
results.append(result)
break # 成功、次へ進む
except ConnectionError:
# レート制限または一時的なネットワークエラー
print(f"レート制限または一時的なエラー、{backoff}s後に再試行します...")
time.sleep(backoff)
backoff = min(backoff * 2, 30)
return results
インポートの明確さ:
from shannon import ConnectionErrorはSDKの例外を指します(Pythonの組み込みConnectionErrorではありません)。レート制限のバックオフやサーキットブレーカーなどの高度なパターンは参照実装です — 環境で検証してください。バリデーションエラー
無効なパラメータを処理します:コピー
from shannon import ShannonClient, ValidationError
def validate_and_submit(query, session_id=None):
client = ShannonClient()
try:
return client.submit_task(query=query, session_id=session_id)
except ValidationError as e:
print(f"無効なパラメータ: {e}")
return None
タスク失敗処理
タスク実行の失敗を処理します:コピー
from shannon import ShannonClient, TaskTimeoutError, TaskStatusEnum
def handle_task_failure(query: str):
client = ShannonClient()
try:
handle = client.submit_task(query=query)
status = client.wait(handle.task_id, timeout=120)
if status.status == TaskStatusEnum.FAILED:
print(f"タスクが失敗しました: {status.error_message}")
return None
return status
except TaskTimeoutError:
print("タスクがタイムアウトしました; タイムアウトを増やすか、リクエストを簡素化することを検討してください。")
return None
エラーロギング
包括的なエラーロギングを実装します:コピー
import logging
from shannon import ShannonClient, ShannonError
# ロギングの設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('shannon')
def logged_task_submission(query):
client = ShannonClient()
try:
logger.info(f"タスクを送信中: {query[:50]}...")
handle = client.submit_task(query=query)
logger.info(f"タスクが送信されました: task_id={handle.task_id}")
result = client.wait(handle.task_id)
logger.info("タスクが正常に完了しました")
return result.result
except ShannonError as e:
logger.error(f"Shannonエラー: {e}", exc_info=True)
raise
except Exception as e:
logger.critical(f"予期しないエラー: {e}", exc_info=True)
raise
サーキットブレーカーパターン
レジリエンスのためにサーキットブレーカーを実装します:コピー
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise Exception("サーキットブレーカーはオープンです")
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
# 使用例
breaker = CircuitBreaker()
client = ShannonClient()
try:
result = breaker.call(
client.submit_task,
query="データを分析する"
)
except Exception as e:
print(f"サービスが利用できません: {e}")
ベストプラクティス
- 特定の例外を常にキャッチし、一般的なものの前にする
- 指数バックオフを伴う再試行ロジックを実装する
- エラーをログに記録してデバッグと監視を行う
- 重要な操作のためにフォールバックオプションを提供する
- ハングを避けるために合理的なタイムアウトを設定する
- 送信前に入力を検証する
- 外部依存関係に対してサーキットブレーカーを使用する
次のステップ
完全な再試行例
コピー
#!/usr/bin/env python3
"""再試行ロジックを伴うエラーハンドリングの例"""
import time
from shannon import ShannonClient, ConnectionError, TaskTimeoutError, ShannonError
def robust_task_submission(query: str, max_retries: int = 3):
"""
再試行ロジックと包括的なエラーハンドリングを伴うタスクを送信します。
Args:
query: タスクのクエリ
max_retries: 回復可能なエラーの最大再試行回数
Returns:
TaskStatusオブジェクトまたはすべての試行が失敗した場合はNone
"""
client = ShannonClient()
for attempt in range(max_retries):
try:
print(f"\n[試行 {attempt + 1}/{max_retries}] タスクを送信中...")
handle = client.submit_task(query=query)
print(f"✅ タスクが送信されました: {handle.task_id}")
print("⏳ 結果を待っています(300秒のタイムアウト)...")
result = client.wait(handle.task_id, timeout=300)
print("✅ タスクが正常に完了しました")
return result
except ConnectionError as e:
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f"❌ 接続エラー: {e}")
if attempt < max_retries - 1:
print(f"⏳ {wait_time}秒後に再試行します...")
time.sleep(wait_time)
else:
print(f"❌ 最大再試行回数({max_retries})に達しました。あきらめます。")
raise
except TaskTimeoutError as e:
print(f"❌ タスクタイムアウトエラー: {e}")
print("⚠️ タスクが300秒の制限を超えました。再試行しません。")
raise
except ShannonError as e:
print(f"❌ Shannon APIエラー: {e}")
print("⚠️ APIエラーが発生しました。再試行しません。")
raise
except Exception as e:
print(f"❌ 予期しないエラー: {type(e).__name__}: {e}")
raise
return None
def main():
print("=" * 60)
print("エラーハンドリングの例")
print("=" * 60)
# 例1: 通常の実行
print("\n📝 例1: 通常の実行")
print("-" * 60)
try:
result = robust_task_submission("2+2は何ですか?")
if result:
print(f"\n最終結果: {result.result}")
except Exception as e:
print(f"\n⚠️ 失敗: {e}")
# 例2: タイムアウトシナリオ(早期失敗)
print("\n\n📝 例2: タイムアウトシナリオ")
print("-" * 60)
print("注: これはタイムアウト処理を示しています")
try:
result = robust_task_submission(
"この複雑なデータセットを分析します...(長いタスクをシミュレート)"
)
if result:
print(f"\n最終結果: {result.result}")
except TaskTimeoutError:
print("\n⚠️ タスクタイムアウト - 再試行は行われません(期待される動作)")
except Exception as e:
print(f"\n⚠️ エラー: {e}")
# 例3: シンプルなtry/catchパターン
print("\n\n📝 例3: シンプルなtry-catchパターン")
print("-" * 60)
client = ShannonClient()
try:
handle = client.submit_task(query="フランスの首都は何ですか?")
result = client.wait(handle.task_id)
print(f"✅ 成功: {result.result}")
except ConnectionError:
print("❌ ネットワークの問題 - Shannonサービスを確認してください")
except TaskTimeoutError:
print("❌ タスクが長すぎました")
except ShannonError as e:
print(f"❌ APIエラー: {e}")
except Exception as e:
print(f"❌ 予期しないエラー: {e}")
print("\n" + "=" * 60)
print("✅ エラーハンドリングの例が完了しました!")
print("=" * 60)
if __name__ == "__main__":
main()
コピー
# Shannonが実行中であることを確認
make dev
# 例を実行
python3 error_handling.py
- ネットワークの不安定さ
- 一時的なサービスの問題
- レジリエンスが必要なプロダクションシステム
- タイムアウト(タスクが複雑すぎる)
- APIエラー(無効なパラメータ)
- 認証失敗