Skip to main content
Full async documentation coming soon. Basic patterns are shown below.

Overview

The Shannon Python SDK provides both synchronous and asynchronous clients. Use async operations for:
  • Concurrent task submission
  • Non-blocking event streaming
  • Parallel API calls
  • High-throughput applications

AsyncShannonClient

Basic Usage

import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient(base_url="http://localhost:8080") as client:
        handle = await client.submit_task(
            query=(
                "Summarize the paragraph into 3 bullet points focusing on revenue trends. "
                "Output: Markdown list."
            ),
            model_tier="small",
            mode="standard",
        )
        final = await client.wait(handle.task_id)
        print(final.result)

asyncio.run(main())

Concurrent Tasks

Submit multiple tasks simultaneously:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # Submit multiple tasks concurrently
        tasks = [
            client.submit_task(query="What is 5+5?"),
            client.submit_task(query="What is 10*2?"),
            client.submit_task(query="What is 100/4?"),
        ]

        # Wait for all submissions
        handles = await asyncio.gather(*tasks)

        # Get all results
        results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])

        for i, r in enumerate(results, 1):
            print(f"Task {i}: {r.result}")

asyncio.run(main())

Async Streaming

Stream events without blocking. Tip: don’t await other client calls inside the async for loop — break out first, then await:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # Submit task
        h = await client.submit_task(query="What is the capital of France?")

        # Stream events
        async for e in client.stream(h.workflow_id):
            print(f"{e.type}: {e.message[:50]}")
            if e.type == "WORKFLOW_COMPLETED":
                break  # Exit loop cleanly first

        # Now safe to await
        final = await client.wait(h.task_id)
        print(f"\nFinal result: {final.result}")

asyncio.run(main())

Timeout Handling

async def with_timeout():
    async with AsyncShannonClient() as client:
        try:
            # Submit with timeout
            handle = await asyncio.wait_for(
                client.submit_task(
                    query=(
                        "Extract the top 3 insights from the paragraph and return a Markdown bullet list."
                    )
                ),
                timeout=30.0
            )

            # Wait for completion with timeout
            result = await asyncio.wait_for(
                client.wait(handle.task_id),
                timeout=300.0
            )

        except asyncio.TimeoutError:
            print("Operation timed out")

Background Tasks

Run tasks in background while doing other work:
import asyncio
from shannon import AsyncShannonClient

async def main():
    async with AsyncShannonClient() as client:
        # Submit task
        handle = await client.submit_task(query="Generate a ~500-word report on quarterly revenue drivers and risks. Output: Markdown.")

        # Start background wait (doesn't block)
        task = asyncio.create_task(client.wait(handle.task_id))

        # Do other work while task runs
        print("Processing in background...")
        await asyncio.sleep(2)  # Your other async work here

        # Check status
        if not task.done():
            print("Still processing...")

        # Get result when needed
        result = await task
        print(f"Result: {result.result}")

asyncio.run(main())

Error Handling

import asyncio
from shannon import AsyncShannonClient, ConnectionError, TaskTimeoutError

async def main():
    async with AsyncShannonClient() as client:
        try:
            handle = await client.submit_task(query="What is AI?")
            result = await client.wait(handle.task_id)
            print(f"Result: {result.result}")

        except ConnectionError:
            print("Failed to connect to Shannon")
        except TaskTimeoutError:
            print("Task timed out")
        except Exception as e:
            print(f"Unexpected error: {e}")

asyncio.run(main())

Integration with Web Frameworks

FastAPI Example

from contextlib import asynccontextmanager
from fastapi import FastAPI
from shannon import AsyncShannonClient

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create client
    app.state.client = AsyncShannonClient()
    yield
    # Shutdown: close client
    await app.state.client.close()

app = FastAPI(lifespan=lifespan)

@app.post("/analyze")
async def analyze(query: str):
    client = app.state.client
    handle = await client.submit_task(query=query)
    result = await client.wait(handle.task_id)
    return {"result": result.result}
To test:
# Install FastAPI and uvicorn
pip install fastapi uvicorn

# Run server
uvicorn your_file:app --reload

# Test endpoint
curl -X POST "http://127.0.0.1:8000/analyze?query=What+is+AI?"

Best Practices

  1. Use context managers (async with) for proper cleanup
  2. Handle timeouts for long-running operations
  3. Implement retry logic for network failures
  4. Use gather() for concurrent operations
  5. Stream events for real-time updates

Next Steps