import asynciofrom shannon import AsyncShannonClientasync def main(): async with AsyncShannonClient() as client: # Submit multiple tasks concurrently tasks = [ client.submit_task(query="What is 5+5?"), client.submit_task(query="What is 10*2?"), client.submit_task(query="What is 100/4?"), ] # Wait for all submissions handles = await asyncio.gather(*tasks) # Get all results results = await asyncio.gather(*[client.wait(h.task_id) for h in handles]) for i, r in enumerate(results, 1): print(f"Task {i}: {r.result}")asyncio.run(main())
Stream events without blocking. Tip: don’t await other client calls inside the async for loop — break out first, then await:
Copy
import asynciofrom shannon import AsyncShannonClientasync def main(): async with AsyncShannonClient() as client: # Submit task h = await client.submit_task(query="What is the capital of France?") # Stream events async for e in client.stream(h.workflow_id): print(f"{e.type}: {e.message[:50]}") if e.type == "WORKFLOW_COMPLETED": break # Exit loop cleanly first # Now safe to await final = await client.wait(h.task_id) print(f"\nFinal result: {final.result}")asyncio.run(main())
async def with_timeout(): async with AsyncShannonClient() as client: try: # Submit with timeout handle = await asyncio.wait_for( client.submit_task( query=( "Extract the top 3 insights from the paragraph and return a Markdown bullet list." ) ), timeout=30.0 ) # Wait for completion with timeout result = await asyncio.wait_for( client.wait(handle.task_id), timeout=300.0 ) except asyncio.TimeoutError: print("Operation timed out")
import asynciofrom shannon import AsyncShannonClientasync def main(): async with AsyncShannonClient() as client: # Submit task handle = await client.submit_task(query="Generate a ~500-word report on quarterly revenue drivers and risks. Output: Markdown.") # Start background wait (doesn't block) task = asyncio.create_task(client.wait(handle.task_id)) # Do other work while task runs print("Processing in background...") await asyncio.sleep(2) # Your other async work here # Check status if not task.done(): print("Still processing...") # Get result when needed result = await task print(f"Result: {result.result}")asyncio.run(main())
# Install FastAPI and uvicornpip install fastapi uvicorn# Run serveruvicorn your_file:app --reload# Test endpointcurl -X POST "http://127.0.0.1:8000/analyze?query=What+is+AI?"