This examples collection is growing. More use cases will be added regularly.
Basic Examples
Model Selection
Copy
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
"Summarize the paragraph into three bullet points focusing on revenue trends. Output: Markdown list.",
model_tier="small",
# model_override="gpt-5-nano-2025-08-07",
# provider_override="openai",
mode="simple",
)
print(client.wait(handle.task_id).result)
Simple Question Answering
Copy
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# Ask a simple question
handle = client.submit_task(query="What is the capital of France?")
final = client.wait(handle.task_id)
print(f"Answer: {final.result}")
Data Analysis
Copy
from shannon import ShannonClient
client = ShannonClient()
# Realistic data analysis
handle = client.submit_task(
query=(
"Given the sales data below, calculate:\n"
"1. Total revenue per product\n"
"2. Month-over-month growth percentage\n"
"3. Return top 3 by total revenue\n\n"
"Data:\n"
"ProductA: Jan=$10000, Feb=$12000\n"
"ProductB: Jan=$8000, Feb=$9500\n"
"ProductC: Jan=$15000, Feb=$14000\n"
"ProductD: Jan=$5000, Feb=$6000\n\n"
"Format: JSON array [{product, revenue, mom_growth_pct}]"
)
)
result = client.wait(handle.task_id)
print(result.result)
Advanced Examples
Multi-Step Workflow
Copy
"""Multi-step workflow example with Shannon SDK"""
from shannon import ShannonClient
def main():
client = ShannonClient()
session_id = "quarterly-analysis-demo"
print("=" * 60)
print("Multi-Step Workflow Example")
print("=" * 60)
# Step 1: Initial data query
print("\n[Step 1] Loading Q4 data...")
h1 = client.submit_task(
query="What is 1000 + 500? This represents Q4 revenue (1000) and expenses (500).",
session_id=session_id
)
result1 = client.wait(h1.task_id)
print(f"Result: {result1.result}")
# Step 2: Analysis based on previous context
print("\n[Step 2] Analyzing trends...")
h2 = client.submit_task(
query="Based on the Q4 numbers you just calculated, what is the profit margin percentage?",
session_id=session_id
)
result2 = client.wait(h2.task_id)
print(f"Result: {result2.result}")
# Step 3: Summary using all previous context
print("\n[Step 3] Creating executive summary...")
h3 = client.submit_task(
query="Summarize the Q4 financial analysis in 2 sentences.",
session_id=session_id
)
result3 = client.wait(h3.task_id)
print(f"Result: {result3.result}")
print("\n" + "=" * 60)
print("✅ Multi-step workflow completed!")
print(f"Session ID: {session_id}")
print("=" * 60)
if __name__ == "__main__":
main()
Parallel Processing
Copy
"""Parallel processing example with Shannon SDK"""
import asyncio
from shannon import AsyncShannonClient
async def main():
print("=" * 60)
print("Parallel Processing Example")
print("=" * 60)
async with AsyncShannonClient() as client:
# Topics to process in parallel
topics = ["AI", "Quantum Computing", "Biotechnology"]
print(f"\n[Step 1] Submitting {len(topics)} tasks in parallel...")
# Submit all tasks concurrently (non-blocking)
tasks = [
client.submit_task(
query=f"Give a one-sentence summary of {topic} and its main application."
)
for topic in topics
]
# Wait for all submissions to complete
handles = await asyncio.gather(*tasks)
print(f"✅ All {len(handles)} tasks submitted")
print("\n[Step 2] Waiting for all results...")
# Wait for all tasks to complete in parallel
results = await asyncio.gather(
*[client.wait(h.task_id) for h in handles]
)
print(f"✅ All {len(results)} tasks completed")
print("\n[Step 3] Results:")
print("-" * 60)
# Display results
for topic, result in zip(topics, results):
print(f"\n📌 {topic}:")
print(f" {result.result}")
# Safely access metadata if available
metadata = []
if hasattr(result, 'metadata') and result.metadata:
model = result.metadata.get('model_used') or result.metadata.get('model')
if model:
metadata.append(f"Model: {model}")
tokens = result.metadata.get('total_tokens')
if tokens:
metadata.append(f"Tokens: {tokens}")
if metadata:
print(f" ({', '.join(metadata)})")
print("\n" + "=" * 60)
print("✅ Parallel processing completed!")
print(f"Processed {len(topics)} topics concurrently")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
Streaming with filters
Copy
"""Streaming with event filters example"""
from shannon import ShannonClient, EventType
def main():
print("=" * 60)
print("Streaming with Filters Example")
print("=" * 60)
client = ShannonClient()
print("\n[Step 1] Submitting task...")
handle = client.submit_task(
query=(
"Compare AWS, Azure, and GCP pricing models in 3 sentences. "
"Stream the output as you generate it."
)
)
print(f"✅ Task submitted: {handle.task_id}")
print("\n[Step 2] Streaming with filters [LLM_OUTPUT, WORKFLOW_COMPLETED]...")
print("-" * 60)
event_count = 0
llm_outputs = []
# Stream only LLM_OUTPUT and WORKFLOW_COMPLETED events
for event in client.stream(
handle.workflow_id,
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
):
event_count += 1
print(f"[{event_count}] {event.type}")
if event.type == EventType.LLM_OUTPUT:
print(f" Content: {event.message[:80]}...")
llm_outputs.append(event.message)
if event.type == EventType.WORKFLOW_COMPLETED:
print(f" Status: {event.message}")
break
print("\n" + "-" * 60)
print(f"✅ Received {event_count} filtered events")
print(f"✅ Captured {len(llm_outputs)} LLM outputs")
print("\n[Step 3] Full result:")
print("-" * 60)
final = client.wait(handle.task_id)
print(final.result)
print("\n" + "=" * 60)
print("✅ Streaming with filters completed!")
print("=" * 60)
if __name__ == "__main__":
main()
Available event types
LLM_OUTPUT— LLM responsesLLM_PARTIAL— streaming tokensWORKFLOW_COMPLETED— task finishedAGENT_THINKING— agent reasoningAGENT_STARTED— agent begins workAGENT_COMPLETED— agent finishesPROGRESS— progress updatesERROR_OCCURRED— errors
- Only final outputs:
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED] - Real-time streaming:
types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED] - Monitor agents:
types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED] - Everything (no filter):
types=None(or omit the parameter)
Task Control Examples
Pause and Resume Long-Running Tasks
Copy
"""Task control (pause/resume) example"""
from shannon import ShannonClient
import time
def main():
print("=" * 60)
print("Task Control (Pause/Resume) Example")
print("=" * 60)
client = ShannonClient(base_url="http://localhost:8080")
# Submit a long-running research task
print("\n[Step 1] Submitting long-running task...")
handle = client.submit_task(
query="Research the latest developments in quantum computing and AI integration. Provide detailed analysis."
)
print(f"✅ Task submitted: {handle.task_id}")
# Let it run for a bit
print("\n[Step 2] Letting task run for 5 seconds...")
time.sleep(5)
# Pause the task
print("\n[Step 3] Pausing task...")
success = client.pause_task(handle.task_id, reason="Review intermediate results")
if success:
print("✅ Pause signal sent")
# Check control state
print("\n[Step 4] Checking control state...")
state = client.get_control_state(handle.task_id)
print(f"Is paused: {state.is_paused}")
print(f"Is cancelled: {state.is_cancelled}")
if state.paused_at:
print(f"Paused at: {state.paused_at}")
print(f"Pause reason: {state.pause_reason}")
print(f"Paused by: {state.paused_by}")
# Wait a bit before resuming
print("\n[Step 5] Waiting 3 seconds before resume...")
time.sleep(3)
# Resume the task
print("\n[Step 6] Resuming task...")
success = client.resume_task(handle.task_id, reason="Continue after review")
if success:
print("✅ Resume signal sent")
# Wait for completion
print("\n[Step 7] Waiting for task completion...")
result = client.wait(handle.task_id)
print(f"\n✅ Task completed!")
print(f"Status: {result.status}")
print(f"\nResult preview: {result.result[:200]}...")
print("\n" + "=" * 60)
print("✅ Task control example completed!")
print("=" * 60)
if __name__ == "__main__":
main()
Error Handling for Control Signals
Copy
"""Error handling for pause/resume operations"""
from shannon import ShannonClient
from shannon.errors import APIError
def main():
client = ShannonClient()
task_id = "task-123"
# Attempt to pause
try:
client.pause_task(task_id, reason="Review needed")
print("✅ Task paused successfully")
except APIError as e:
# Handle specific error cases
if "cannot pause completed task" in str(e):
print("⚠️ Task already completed, cannot pause")
elif "task is already paused" in str(e):
print("⚠️ Task is already paused")
elif "Task not found" in str(e):
print("❌ Task not found")
else:
print(f"❌ Error: {e}")
# Attempt to resume
try:
client.resume_task(task_id, reason="Continue execution")
print("✅ Task resumed successfully")
except APIError as e:
if "task is not paused" in str(e):
print("⚠️ Task is not paused, cannot resume")
elif "cannot resume completed task" in str(e):
print("⚠️ Task already completed, cannot resume")
else:
print(f"❌ Error: {e}")
if __name__ == "__main__":
main()