Skip to main content
Error handling documentation is being expanded. Core patterns are shown below.

Overview

The Shannon Python SDK provides comprehensive error handling to help you build robust applications. All SDK exceptions inherit from ShannonError.

Exception Hierarchy

ShannonError                    # Base exception
├── ConnectionError             # Network/connection issues (SDK-specific)
├── AuthenticationError         # API key/auth problems
├── PermissionDeniedError       # Forbidden / authorization failure
├── ValidationError             # Invalid parameters
├── RateLimitError              # Too many requests / throttled
├── ServerError                 # Upstream 5xx server error
├── TaskNotFoundError           # Task doesn't exist
├── TaskTimeoutError            # Task exceeded timeout
├── TaskCancelledError          # Task was cancelled
├── SessionNotFoundError        # Session doesn't exist
├── SessionExpiredError         # Session expired
├── TemplateError               # Template/route errors
└── TemplateNotFoundError       # Template not found
Budget and Task Failure Handling: Budget exceeded and task failures are not exceptions. For failures, check status.status. For token usage and cost totals, use list_tasks() and read total_token_usage from the returned task summaries.

Basic Error Handling

Try-Catch Pattern

from shannon import (
    ShannonClient,
    ShannonError,
    AuthenticationError,
    ConnectionError,
    TaskTimeoutError,
    TaskStatusEnum,
)

client = ShannonClient(base_url="http://localhost:8080")

try:
    handle = client.submit_task(query="Analyze this data")
    status = client.wait(handle.task_id, timeout=120)

    if status.status == TaskStatusEnum.FAILED:
        print(f"Task failed: {status.error_message}")
    else:
        print("Result:", status.result)

except ConnectionError:
    print("Could not connect to Shannon server")
except AuthenticationError:
    print("Invalid API credentials")
except PermissionDeniedError:
    print("Forbidden: insufficient permissions")
except RateLimitError:
    print("Rate limited: slow down or add backoff")
except ServerError:
    print("Server error (5xx): try again later")
except TaskTimeoutError:
    print("Task exceeded timeout limit")
except ShannonError as e:
    print(f"Shannon error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Specific Error Types

Connection Errors

Handle network and connection issues:
import time
from shannon import ShannonClient, ConnectionError

def connect_with_retry(max_retries=3):
    client = ShannonClient()

    for attempt in range(max_retries):
        try:
            # Test connection with a simple task
            handle = client.submit_task(query="ping")
            return client

        except ConnectionError as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Connection failed, retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                print(f"Failed to connect after {max_retries} attempts")
                raise

Check Cost and Failures

Monitor failures from status and get usage totals via list_tasks():
from shannon import ShannonClient, TaskStatusEnum

client = ShannonClient()
handle = client.submit_task(query="Analyze data")
status = client.wait(handle.task_id)

# Failure check (TaskStatus returns enum)
if status.status == TaskStatusEnum.FAILED:
    print(f"Failed with error: {status.error_message}")

# Usage and cost totals (from task listings)
tasks, _ = client.list_tasks(limit=50)
summary = next((t for t in tasks if t.task_id == handle.task_id), None)
usage = summary.total_token_usage if summary else None
if usage:
    print(f"tokens={usage.total_tokens} prompt={usage.prompt_tokens} completion={usage.completion_tokens} cost=${usage.cost_usd:.6f}")

# Note: status on TaskSummary is a string
if summary and summary.status == "FAILED":
    print("Summary indicates failure")

Timeout Errors

Handle long-running operations:
import asyncio
from shannon import AsyncShannonClient, TaskTimeoutError

async def with_timeout_handling():
    async with AsyncShannonClient() as client:
        try:
            # asyncio.TimeoutError comes from wait_for wrappers (client-side)
            handle = await asyncio.wait_for(
                client.submit_task(query="Complex analysis"),
                timeout=10.0,
            )
            # TaskTimeoutError comes from Shannon if the task exceeded its own timeout
            result = await asyncio.wait_for(
                client.wait(handle.task_id),
                timeout=60.0,
            )
            return result

        except asyncio.TimeoutError:
            print("Operation timed out (client-side asyncio timeout)")
            return None
        except TaskTimeoutError:
            print("Task timed out (Shannon reported timeout)")
            return None

Rate Limiting

Handle API rate limits gracefully:
from shannon import ShannonClient, ConnectionError
import time

def handle_rate_limits(queries):
    client = ShannonClient()
    results = []

    for query in queries:
        backoff = 1
        while True:
            try:
                handle = client.submit_task(query=query)
                result = client.wait(handle.task_id)
                results.append(result)
                break  # Success, move to next
            except ConnectionError:
                # Rate limited or transient network error
                print(f"Rate limited or transient error, retrying in {backoff}s...")
                time.sleep(backoff)
                backoff = min(backoff * 2, 30)

    return results
Import clarity: from shannon import ConnectionError refers to the SDK’s exception (not Python’s built‑in ConnectionError). Advanced patterns such as rate limiting backoff and circuit breakers are reference implementations — validate in your environment.

Validation Errors

Handle invalid parameters:
from shannon import ShannonClient, ValidationError

def validate_and_submit(query, session_id=None):
    client = ShannonClient()

    try:
        return client.submit_task(query=query, session_id=session_id)
    except ValidationError as e:
        print(f"Invalid parameters: {e}")
        return None

Task Failure Handling

Handle task execution failures:
from shannon import ShannonClient, TaskTimeoutError, TaskStatusEnum

def handle_task_failure(query: str):
    client = ShannonClient()

    try:
        handle = client.submit_task(query=query)
        status = client.wait(handle.task_id, timeout=120)

        if status.status == TaskStatusEnum.FAILED:
            print(f"Task failed: {status.error_message}")
            return None
        return status

    except TaskTimeoutError:
        print("Task timed out; consider increasing timeout or simplifying the request.")
        return None

Logging Errors

Implement comprehensive error logging:
import logging
from shannon import ShannonClient, ShannonError

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('shannon')

def logged_task_submission(query):
    client = ShannonClient()

    try:
        logger.info(f"Submitting task: {query[:50]}...")
        handle = client.submit_task(query=query)

        logger.info(f"Task submitted: task_id={handle.task_id}")
        result = client.wait(handle.task_id)

        logger.info("Task completed successfully")
        return result.result

    except ShannonError as e:
        logger.error(f"Shannon error: {e}", exc_info=True)
        raise

    except Exception as e:
        logger.critical(f"Unexpected error: {e}", exc_info=True)
        raise

Circuit Breaker Pattern

Implement circuit breaker for resilience:
class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "half-open"
            else:
                raise Exception("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0
            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "open"

            raise

# Usage
breaker = CircuitBreaker()
client = ShannonClient()

try:
    result = breaker.call(
        client.submit_task,
        query="Analyze data"
    )
except Exception as e:
    print(f"Service unavailable: {e}")

Best Practices

  1. Always catch specific exceptions before generic ones
  2. Implement retry logic with exponential backoff
  3. Log errors for debugging and monitoring
  4. Provide fallback options for critical operations
  5. Set reasonable timeouts to avoid hanging
  6. Validate inputs before submission
  7. Use circuit breakers for external dependencies

Next Steps

Complete Retry Example

#!/usr/bin/env python3
"""Error handling with retry logic example"""

import time
from shannon import ShannonClient, ConnectionError, TaskTimeoutError, ShannonError

def robust_task_submission(query: str, max_retries: int = 3):
    """
    Submit a task with retry logic and comprehensive error handling.

    Args:
        query: The task query
        max_retries: Maximum retry attempts for recoverable errors

    Returns:
        TaskStatus object or None if all attempts fail
    """
    client = ShannonClient()

    for attempt in range(max_retries):
        try:
            print(f"\n[Attempt {attempt + 1}/{max_retries}] Submitting task...")
            handle = client.submit_task(query=query)
            print(f"✅ Task submitted: {handle.task_id}")
            print("⏳ Waiting for result (300s timeout)...")
            result = client.wait(handle.task_id, timeout=300)
            print("✅ Task completed successfully")
            return result

        except ConnectionError as e:
            wait_time = 2 ** attempt  # 1s, 2s, 4s
            print(f"❌ Connection error: {e}")
            if attempt < max_retries - 1:
                print(f"⏳ Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print(f"❌ Max retries ({max_retries}) reached. Giving up.")
                raise

        except TaskTimeoutError as e:
            print(f"❌ Task timeout error: {e}")
            print("⚠️  Task exceeded 300 second limit. Not retrying.")
            raise

        except ShannonError as e:
            print(f"❌ Shannon API error: {e}")
            print("⚠️  API error encountered. Not retrying.")
            raise

        except Exception as e:
            print(f"❌ Unexpected error: {type(e).__name__}: {e}")
            raise

    return None


def main():
    print("=" * 60)
    print("Error Handling Examples")
    print("=" * 60)

    # Example 1: Normal execution
    print("\n📝 Example 1: Normal execution")
    print("-" * 60)
    try:
        result = robust_task_submission("What is 2+2?")
        if result:
            print(f"\nFinal result: {result.result}")
    except Exception as e:
        print(f"\n⚠️  Failed: {e}")

    # Example 2: Timeout scenario (fail fast)
    print("\n\n📝 Example 2: Timeout scenario")
    print("-" * 60)
    print("Note: This demonstrates timeout handling")
    try:
        result = robust_task_submission(
            "Analyze this complex dataset... (simulated long task)"
        )
        if result:
            print(f"\nFinal result: {result.result}")
    except TaskTimeoutError:
        print("\n⚠️  Task timeout - no retry attempted (expected behavior)")
    except Exception as e:
        print(f"\n⚠️  Error: {e}")

    # Example 3: Simple try/catch pattern
    print("\n\n📝 Example 3: Simple try-catch pattern")
    print("-" * 60)
    client = ShannonClient()
    try:
        handle = client.submit_task(query="What is the capital of France?")
        result = client.wait(handle.task_id)
        print(f"✅ Success: {result.result}")
    except ConnectionError:
        print("❌ Network issue - check Shannon service")
    except TaskTimeoutError:
        print("❌ Task took too long")
    except ShannonError as e:
        print(f"❌ API error: {e}")
    except Exception as e:
        print(f"❌ Unexpected error: {e}")

    print("\n" + "=" * 60)
    print("✅ Error handling examples completed!")
    print("=" * 60)


if __name__ == "__main__":
    main()
To run:
# Ensure Shannon is running
make dev

# Run the example
python3 error_handling.py
When to use retry logic:
  • Network instability
  • Transient service issues
  • Production systems requiring resilience
When NOT to retry:
  • Timeouts (task is too complex)
  • API errors (invalid parameters)
  • Authentication failures