Full async documentation coming soon. Basic patterns are shown below.
Overview
The Shannon Python SDK provides both synchronous and asynchronous clients. Use async operations for:
- Concurrent task submission
- Non-blocking event streaming
- Parallel API calls
- High-throughput applications
AsyncShannonClient
Basic Usage
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient(base_url="http://localhost:8080") as client:
handle = await client.submit_task(
query=(
"Summarize the paragraph into 3 bullet points focusing on revenue trends. "
"Output: Markdown list."
),
model_tier="small",
mode="standard",
)
final = await client.wait(handle.task_id)
print(final.result)
asyncio.run(main())
Concurrent Tasks
Submit multiple tasks simultaneously:
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# Submit multiple tasks concurrently
tasks = [
client.submit_task(query="What is 5+5?"),
client.submit_task(query="What is 10*2?"),
client.submit_task(query="What is 100/4?"),
]
# Wait for all submissions
handles = await asyncio.gather(*tasks)
# Get all results
results = await asyncio.gather(*[client.wait(h.task_id) for h in handles])
for i, r in enumerate(results, 1):
print(f"Task {i}: {r.result}")
asyncio.run(main())
Async Streaming
Stream events without blocking. Tip: don’t await other client calls inside the async for loop — break out first, then await:
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# Submit task
h = await client.submit_task(query="What is the capital of France?")
# Stream events
async for e in client.stream(h.workflow_id):
print(f"{e.type}: {e.message[:50]}")
if e.type == "WORKFLOW_COMPLETED":
break # Exit loop cleanly first
# Now safe to await
final = await client.wait(h.task_id)
print(f"\nFinal result: {final.result}")
asyncio.run(main())
Timeout Handling
async def with_timeout():
async with AsyncShannonClient() as client:
try:
# Submit with timeout
handle = await asyncio.wait_for(
client.submit_task(
query=(
"Extract the top 3 insights from the paragraph and return a Markdown bullet list."
)
),
timeout=30.0
)
# Wait for completion with timeout
result = await asyncio.wait_for(
client.wait(handle.task_id),
timeout=300.0
)
except asyncio.TimeoutError:
print("Operation timed out")
Background Tasks
Run tasks in background while doing other work:
import asyncio
from shannon import AsyncShannonClient
async def main():
async with AsyncShannonClient() as client:
# Submit task
handle = await client.submit_task(query="Generate a ~500-word report on quarterly revenue drivers and risks. Output: Markdown.")
# Start background wait (doesn't block)
task = asyncio.create_task(client.wait(handle.task_id))
# Do other work while task runs
print("Processing in background...")
await asyncio.sleep(2) # Your other async work here
# Check status
if not task.done():
print("Still processing...")
# Get result when needed
result = await task
print(f"Result: {result.result}")
asyncio.run(main())
Error Handling
import asyncio
from shannon import AsyncShannonClient, ConnectionError, TaskTimeoutError
async def main():
async with AsyncShannonClient() as client:
try:
handle = await client.submit_task(query="What is AI?")
result = await client.wait(handle.task_id)
print(f"Result: {result.result}")
except ConnectionError:
print("Failed to connect to Shannon")
except TaskTimeoutError:
print("Task timed out")
except Exception as e:
print(f"Unexpected error: {e}")
asyncio.run(main())
Integration with Web Frameworks
FastAPI Example
from contextlib import asynccontextmanager
from fastapi import FastAPI
from shannon import AsyncShannonClient
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create client
app.state.client = AsyncShannonClient()
yield
# Shutdown: close client
await app.state.client.close()
app = FastAPI(lifespan=lifespan)
@app.post("/analyze")
async def analyze(query: str):
client = app.state.client
handle = await client.submit_task(query=query)
result = await client.wait(handle.task_id)
return {"result": result.result}
To test:
# Install FastAPI and uvicorn
pip install fastapi uvicorn
# Run server
uvicorn your_file:app --reload
# Test endpoint
curl -X POST "http://127.0.0.1:8000/analyze?query=What+is+AI?"
Best Practices
- Use context managers (
async with) for proper cleanup
- Handle timeouts for long-running operations
- Implement retry logic for network failures
- Use gather() for concurrent operations
- Stream events for real-time updates
Next Steps