Error handling documentation is being expanded. Core patterns are shown below.
Overview
The Shannon Python SDK provides comprehensive error handling to help you build robust applications. All SDK exceptions inherit from ShannonError.
Exception Hierarchy
ShannonError # Base exception
├── ConnectionError # Network/connection issues (SDK-specific)
├── AuthenticationError # API key/auth problems
├── PermissionDeniedError # Forbidden / authorization failure
├── ValidationError # Invalid parameters
├── RateLimitError # Too many requests / throttled
├── ServerError # Upstream 5xx server error
├── TaskNotFoundError # Task doesn't exist
├── TaskTimeoutError # Task exceeded timeout
├── TaskCancelledError # Task was cancelled
├── SessionNotFoundError # Session doesn't exist
├── SessionExpiredError # Session expired
├── TemplateError # Template/route errors
└── TemplateNotFoundError # Template not found
Budget and Task Failure Handling: Budget exceeded and task failures are not exceptions. For failures, check status.status. For token usage and cost totals, use list_tasks() and read total_token_usage from the returned task summaries.
Basic Error Handling
Try-Catch Pattern
from shannon import (
ShannonClient,
ShannonError,
AuthenticationError,
ConnectionError,
TaskTimeoutError,
TaskStatusEnum,
)
client = ShannonClient(base_url="http://localhost:8080")
try:
handle = client.submit_task(query="Analyze this data")
status = client.wait(handle.task_id, timeout=120)
if status.status == TaskStatusEnum.FAILED:
print(f"Task failed: {status.error_message}")
else:
print("Result:", status.result)
except ConnectionError:
print("Could not connect to Shannon server")
except AuthenticationError:
print("Invalid API credentials")
except PermissionDeniedError:
print("Forbidden: insufficient permissions")
except RateLimitError:
print("Rate limited: slow down or add backoff")
except ServerError:
print("Server error (5xx): try again later")
except TaskTimeoutError:
print("Task exceeded timeout limit")
except ShannonError as e:
print(f"Shannon error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Specific Error Types
Connection Errors
Handle network and connection issues:
import time
from shannon import ShannonClient, ConnectionError
def connect_with_retry(max_retries=3):
client = ShannonClient()
for attempt in range(max_retries):
try:
# Test connection with a simple task
handle = client.submit_task(query="ping")
return client
except ConnectionError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Connection failed, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"Failed to connect after {max_retries} attempts")
raise
Check Cost and Failures
Monitor failures from status and get usage totals via list_tasks():
from shannon import ShannonClient, TaskStatusEnum
client = ShannonClient()
handle = client.submit_task(query="Analyze data")
status = client.wait(handle.task_id)
# Failure check (TaskStatus returns enum)
if status.status == TaskStatusEnum.FAILED:
print(f"Failed with error: {status.error_message}")
# Usage and cost totals (from task listings)
tasks, _ = client.list_tasks(limit=50)
summary = next((t for t in tasks if t.task_id == handle.task_id), None)
usage = summary.total_token_usage if summary else None
if usage:
print(f"tokens={usage.total_tokens} prompt={usage.prompt_tokens} completion={usage.completion_tokens} cost=${usage.cost_usd:.6f}")
# Note: status on TaskSummary is a string
if summary and summary.status == "FAILED":
print("Summary indicates failure")
Timeout Errors
Handle long-running operations:
import asyncio
from shannon import AsyncShannonClient, TaskTimeoutError
async def with_timeout_handling():
async with AsyncShannonClient() as client:
try:
# asyncio.TimeoutError comes from wait_for wrappers (client-side)
handle = await asyncio.wait_for(
client.submit_task(query="Complex analysis"),
timeout=10.0,
)
# TaskTimeoutError comes from Shannon if the task exceeded its own timeout
result = await asyncio.wait_for(
client.wait(handle.task_id),
timeout=60.0,
)
return result
except asyncio.TimeoutError:
print("Operation timed out (client-side asyncio timeout)")
return None
except TaskTimeoutError:
print("Task timed out (Shannon reported timeout)")
return None
Rate Limiting
Handle API rate limits gracefully:
from shannon import ShannonClient, ConnectionError
import time
def handle_rate_limits(queries):
client = ShannonClient()
results = []
for query in queries:
backoff = 1
while True:
try:
handle = client.submit_task(query=query)
result = client.wait(handle.task_id)
results.append(result)
break # Success, move to next
except ConnectionError:
# Rate limited or transient network error
print(f"Rate limited or transient error, retrying in {backoff}s...")
time.sleep(backoff)
backoff = min(backoff * 2, 30)
return results
Import clarity: from shannon import ConnectionError refers to the SDK’s exception (not Python’s built‑in ConnectionError). Advanced patterns such as rate limiting backoff and circuit breakers are reference implementations — validate in your environment.
Validation Errors
Handle invalid parameters:
from shannon import ShannonClient, ValidationError
def validate_and_submit(query, session_id=None):
client = ShannonClient()
try:
return client.submit_task(query=query, session_id=session_id)
except ValidationError as e:
print(f"Invalid parameters: {e}")
return None
Task Failure Handling
Handle task execution failures:
from shannon import ShannonClient, TaskTimeoutError, TaskStatusEnum
def handle_task_failure(query: str):
client = ShannonClient()
try:
handle = client.submit_task(query=query)
status = client.wait(handle.task_id, timeout=120)
if status.status == TaskStatusEnum.FAILED:
print(f"Task failed: {status.error_message}")
return None
return status
except TaskTimeoutError:
print("Task timed out; consider increasing timeout or simplifying the request.")
return None
Logging Errors
Implement comprehensive error logging:
import logging
from shannon import ShannonClient, ShannonError
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('shannon')
def logged_task_submission(query):
client = ShannonClient()
try:
logger.info(f"Submitting task: {query[:50]}...")
handle = client.submit_task(query=query)
logger.info(f"Task submitted: task_id={handle.task_id}")
result = client.wait(handle.task_id)
logger.info("Task completed successfully")
return result.result
except ShannonError as e:
logger.error(f"Shannon error: {e}", exc_info=True)
raise
except Exception as e:
logger.critical(f"Unexpected error: {e}", exc_info=True)
raise
Circuit Breaker Pattern
Implement circuit breaker for resilience:
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
# Usage
breaker = CircuitBreaker()
client = ShannonClient()
try:
result = breaker.call(
client.submit_task,
query="Analyze data"
)
except Exception as e:
print(f"Service unavailable: {e}")
Best Practices
- Always catch specific exceptions before generic ones
- Implement retry logic with exponential backoff
- Log errors for debugging and monitoring
- Provide fallback options for critical operations
- Set reasonable timeouts to avoid hanging
- Validate inputs before submission
- Use circuit breakers for external dependencies
Next Steps
Complete Retry Example
#!/usr/bin/env python3
"""Error handling with retry logic example"""
import time
from shannon import ShannonClient, ConnectionError, TaskTimeoutError, ShannonError
def robust_task_submission(query: str, max_retries: int = 3):
"""
Submit a task with retry logic and comprehensive error handling.
Args:
query: The task query
max_retries: Maximum retry attempts for recoverable errors
Returns:
TaskStatus object or None if all attempts fail
"""
client = ShannonClient()
for attempt in range(max_retries):
try:
print(f"\n[Attempt {attempt + 1}/{max_retries}] Submitting task...")
handle = client.submit_task(query=query)
print(f"✅ Task submitted: {handle.task_id}")
print("⏳ Waiting for result (300s timeout)...")
result = client.wait(handle.task_id, timeout=300)
print("✅ Task completed successfully")
return result
except ConnectionError as e:
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f"❌ Connection error: {e}")
if attempt < max_retries - 1:
print(f"⏳ Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"❌ Max retries ({max_retries}) reached. Giving up.")
raise
except TaskTimeoutError as e:
print(f"❌ Task timeout error: {e}")
print("⚠️ Task exceeded 300 second limit. Not retrying.")
raise
except ShannonError as e:
print(f"❌ Shannon API error: {e}")
print("⚠️ API error encountered. Not retrying.")
raise
except Exception as e:
print(f"❌ Unexpected error: {type(e).__name__}: {e}")
raise
return None
def main():
print("=" * 60)
print("Error Handling Examples")
print("=" * 60)
# Example 1: Normal execution
print("\n📝 Example 1: Normal execution")
print("-" * 60)
try:
result = robust_task_submission("What is 2+2?")
if result:
print(f"\nFinal result: {result.result}")
except Exception as e:
print(f"\n⚠️ Failed: {e}")
# Example 2: Timeout scenario (fail fast)
print("\n\n📝 Example 2: Timeout scenario")
print("-" * 60)
print("Note: This demonstrates timeout handling")
try:
result = robust_task_submission(
"Analyze this complex dataset... (simulated long task)"
)
if result:
print(f"\nFinal result: {result.result}")
except TaskTimeoutError:
print("\n⚠️ Task timeout - no retry attempted (expected behavior)")
except Exception as e:
print(f"\n⚠️ Error: {e}")
# Example 3: Simple try/catch pattern
print("\n\n📝 Example 3: Simple try-catch pattern")
print("-" * 60)
client = ShannonClient()
try:
handle = client.submit_task(query="What is the capital of France?")
result = client.wait(handle.task_id)
print(f"✅ Success: {result.result}")
except ConnectionError:
print("❌ Network issue - check Shannon service")
except TaskTimeoutError:
print("❌ Task took too long")
except ShannonError as e:
print(f"❌ API error: {e}")
except Exception as e:
print(f"❌ Unexpected error: {e}")
print("\n" + "=" * 60)
print("✅ Error handling examples completed!")
print("=" * 60)
if __name__ == "__main__":
main()
To run:
# Ensure Shannon is running
make dev
# Run the example
python3 error_handling.py
When to use retry logic:
- Network instability
- Transient service issues
- Production systems requiring resilience
When NOT to retry:
- Timeouts (task is too complex)
- API errors (invalid parameters)
- Authentication failures