Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.shannon.run/llms.txt

Use this file to discover all available pages before exploring further.

Full async documentation coming soon. Basic patterns are shown below.

Overview

The Shannon Python SDK provides both synchronous and asynchronous clients. Use async operations for:
  • Concurrent task submission
  • Non-blocking event streaming
  • Parallel API calls
  • High-throughput applications

AsyncShannonClient

Basic Usage

import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient(base_url="http://localhost:8080") as client:
        handle = await client.submit_task(
            query=(
                "Summarize the paragraph into 3 bullet points focusing on revenue trends. "
                "Output: Markdown list."
            ),
            model_tier="small",
            mode="standard",
        )
        final = await client.wait(handle.task_id)
        print(final.result)

asyncio.run(main())

Concurrent Tasks

Submit multiple tasks simultaneously:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # Submit multiple tasks concurrently
        tasks = [
            client.submit_task(query="What is 5+5?"),
            client.submit_task(query="What is 10*2?"),
            client.submit_task(query="What is 100/4?"),
        ]

        # Wait for all submissions
        handles = await asyncio.gather(*tasks)

        # Get all results
        results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])

        for i, r in enumerate(results, 1):
            print(f"Task {i}: {r.result}")

asyncio.run(main())

Async Streaming

Stream events without blocking. Tip: don’t await other client calls inside the async for loop — break out first, then await:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # Submit task
        h = await client.submit_task(query="What is the capital of France?")

        # Stream events
        async for e in client.stream(h.workflow_id):
            print(f"{e.type}: {e.message[:50]}")
            if e.type == "WORKFLOW_COMPLETED":
                break  # Exit loop cleanly first

        # Now safe to await
        final = await client.wait(h.task_id)
        print(f"\nFinal result: {final.result}")

asyncio.run(main())

Timeout Handling

async def with_timeout():
    async with AsyncShannonClient() as client:
        try:
            # Submit with timeout
            handle = await asyncio.wait_for(
                client.submit_task(
                    query=(
                        "Extract the top 3 insights from the paragraph and return a Markdown bullet list."
                    )
                ),
                timeout=30.0
            )

            # Wait for completion with timeout
            result = await asyncio.wait_for(
                client.wait(handle.task_id),
                timeout=300.0
            )

        except asyncio.TimeoutError:
            print("Operation timed out")

Background Tasks

Run tasks in background while doing other work:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # Submit task
        handle = await client.submit_task(query="Generate a ~500-word report on quarterly revenue drivers and risks. Output: Markdown.")

        # Start background wait (doesn't block)
        task = asyncio.create_task(client.wait(handle.task_id))

        # Do other work while task runs
        print("Processing in background...")
        await asyncio.sleep(2)  # Your other async work here

        # Check status
        if not task.done():
            print("Still processing...")

        # Get result when needed
        result = await task
        print(f"Result: {result.result}")

asyncio.run(main())

Error Handling

import asyncio
from shannon import AsyncShannonClient, ConnectionError, TaskTimeoutError

async def main():
    async with AsyncShannonClient() as client:
        try:
            handle = await client.submit_task(query="What is AI?")
            result = await client.wait(handle.task_id)
            print(f"Result: {result.result}")

        except ConnectionError:
            print("Failed to connect to Shannon")
        except TaskTimeoutError:
            print("Task timed out")
        except Exception as e:
            print(f"Unexpected error: {e}")

asyncio.run(main())

Integration with Web Frameworks

FastAPI Example

from contextlib import asynccontextmanager
from fastapi import FastAPI
from shannon import AsyncShannonClient

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create client
    app.state.client = AsyncShannonClient()
    yield
    # Shutdown: close client
    await app.state.client.close()

app = FastAPI(lifespan=lifespan)

@app.post("/analyze")
async def analyze(query: str):
    client = app.state.client
    handle = await client.submit_task(query=query)
    result = await client.wait(handle.task_id)
    return {"result": result.result}
To test:
# Install FastAPI and uvicorn
pip install fastapi uvicorn

# Run server
uvicorn your_file:app --reload

# Test endpoint
curl -X POST "http://127.0.0.1:8000/analyze?query=What+is+AI?"

Best Practices

  1. Use context managers (async with) for proper cleanup
  2. Handle timeouts for long-running operations
  3. Implement retry logic for network failures
  4. Use gather() for concurrent operations
  5. Stream events for real-time updates

Next Steps

Examples

More code examples

Error Handling

Robust error handling