Documentation Index
Fetch the complete documentation index at: https://docs.shannon.run/llms.txt
Use this file to discover all available pages before exploring further.
This examples collection is growing. More use cases will be added regularly.
Basic Examples
Model Selection
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
"Summarize the paragraph into three bullet points focusing on revenue trends. Output: Markdown list.",
model_tier="small",
# model_override="gpt-5-nano-2025-08-07",
# provider_override="openai",
mode="simple",
)
print(client.wait(handle.task_id).result)
Simple Question Answering
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# Ask a simple question
handle = client.submit_task(query="What is the capital of France?")
final = client.wait(handle.task_id)
print(f"Answer: {final.result}")
Data Analysis
from shannon import ShannonClient
client = ShannonClient()
# Realistic data analysis
handle = client.submit_task(
query=(
"Given the sales data below, calculate:\n"
"1. Total revenue per product\n"
"2. Month-over-month growth percentage\n"
"3. Return top 3 by total revenue\n\n"
"Data:\n"
"ProductA: Jan=$10000, Feb=$12000\n"
"ProductB: Jan=$8000, Feb=$9500\n"
"ProductC: Jan=$15000, Feb=$14000\n"
"ProductD: Jan=$5000, Feb=$6000\n\n"
"Format: JSON array [{product, revenue, mom_growth_pct}]"
)
)
result = client.wait(handle.task_id)
print(result.result)
Advanced Examples
Multi-Step Workflow
"""Multi-step workflow example with Shannon SDK"""
from shannon import ShannonClient
def main():
client = ShannonClient()
session_id = "quarterly-analysis-demo"
print("=" * 60)
print("Multi-Step Workflow Example")
print("=" * 60)
# Step 1: Initial data query
print("\n[Step 1] Loading Q4 data...")
h1 = client.submit_task(
query="What is 1000 + 500? This represents Q4 revenue (1000) and expenses (500).",
session_id=session_id
)
result1 = client.wait(h1.task_id)
print(f"Result: {result1.result}")
# Step 2: Analysis based on previous context
print("\n[Step 2] Analyzing trends...")
h2 = client.submit_task(
query="Based on the Q4 numbers you just calculated, what is the profit margin percentage?",
session_id=session_id
)
result2 = client.wait(h2.task_id)
print(f"Result: {result2.result}")
# Step 3: Summary using all previous context
print("\n[Step 3] Creating executive summary...")
h3 = client.submit_task(
query="Summarize the Q4 financial analysis in 2 sentences.",
session_id=session_id
)
result3 = client.wait(h3.task_id)
print(f"Result: {result3.result}")
print("\n" + "=" * 60)
print("✅ Multi-step workflow completed!")
print(f"Session ID: {session_id}")
print("=" * 60)
if __name__ == "__main__":
main()
Parallel Processing
"""Parallel processing example with Shannon SDK"""
import asyncio
from shannon import AsyncShannonClient
async def main():
print("=" * 60)
print("Parallel Processing Example")
print("=" * 60)
async with AsyncShannonClient() as client:
# Topics to process in parallel
topics = ["AI", "Quantum Computing", "Biotechnology"]
print(f"\n[Step 1] Submitting {len(topics)} tasks in parallel...")
# Submit all tasks concurrently (non-blocking)
tasks = [
client.submit_task(
query=f"Give a one-sentence summary of {topic} and its main application."
)
for topic in topics
]
# Wait for all submissions to complete
handles = await asyncio.gather(*tasks)
print(f"✅ All {len(handles)} tasks submitted")
print("\n[Step 2] Waiting for all results...")
# Wait for all tasks to complete in parallel
results = await asyncio.gather(
*[client.wait(h.task_id) for h in handles]
)
print(f"✅ All {len(results)} tasks completed")
print("\n[Step 3] Results:")
print("-" * 60)
# Display results
for topic, result in zip(topics, results):
print(f"\n📌 {topic}:")
print(f" {result.result}")
# Safely access metadata if available
metadata = []
if hasattr(result, 'metadata') and result.metadata:
model = result.metadata.get('model_used') or result.metadata.get('model')
if model:
metadata.append(f"Model: {model}")
tokens = result.metadata.get('total_tokens')
if tokens:
metadata.append(f"Tokens: {tokens}")
if metadata:
print(f" ({', '.join(metadata)})")
print("\n" + "=" * 60)
print("✅ Parallel processing completed!")
print(f"Processed {len(topics)} topics concurrently")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
Streaming with filters
"""Streaming with event filters example"""
from shannon import ShannonClient, EventType
def main():
print("=" * 60)
print("Streaming with Filters Example")
print("=" * 60)
client = ShannonClient()
print("\n[Step 1] Submitting task...")
handle = client.submit_task(
query=(
"Compare AWS, Azure, and GCP pricing models in 3 sentences. "
"Stream the output as you generate it."
)
)
print(f"✅ Task submitted: {handle.task_id}")
print("\n[Step 2] Streaming with filters [LLM_OUTPUT, WORKFLOW_COMPLETED]...")
print("-" * 60)
event_count = 0
llm_outputs = []
# Stream only LLM_OUTPUT and WORKFLOW_COMPLETED events
for event in client.stream(
handle.workflow_id,
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
):
event_count += 1
print(f"[{event_count}] {event.type}")
if event.type == EventType.LLM_OUTPUT:
print(f" Content: {event.message[:80]}...")
llm_outputs.append(event.message)
if event.type == EventType.WORKFLOW_COMPLETED:
print(f" Status: {event.message}")
break
print("\n" + "-" * 60)
print(f"✅ Received {event_count} filtered events")
print(f"✅ Captured {len(llm_outputs)} LLM outputs")
print("\n[Step 3] Full result:")
print("-" * 60)
final = client.wait(handle.task_id)
print(final.result)
print("\n" + "=" * 60)
print("✅ Streaming with filters completed!")
print("=" * 60)
if __name__ == "__main__":
main()
Available event types
LLM_OUTPUT— LLM responsesLLM_PARTIAL— streaming tokensWORKFLOW_COMPLETED— task finishedAGENT_THINKING— agent reasoningAGENT_STARTED— agent begins workAGENT_COMPLETED— agent finishesPROGRESS— progress updatesERROR_OCCURRED— errors
- Only final outputs:
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED] - Real-time streaming:
types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED] - Monitor agents:
types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED] - Everything (no filter):
types=None(or omit the parameter)
Task Control Examples
Pause and Resume Long-Running Tasks
"""Task control (pause/resume) example"""
from shannon import ShannonClient
import time
def main():
print("=" * 60)
print("Task Control (Pause/Resume) Example")
print("=" * 60)
client = ShannonClient(base_url="http://localhost:8080")
# Submit a long-running research task
print("\n[Step 1] Submitting long-running task...")
handle = client.submit_task(
query="Research the latest developments in quantum computing and AI integration. Provide detailed analysis."
)
print(f"✅ Task submitted: {handle.task_id}")
# Let it run for a bit
print("\n[Step 2] Letting task run for 5 seconds...")
time.sleep(5)
# Pause the task
print("\n[Step 3] Pausing task...")
success = client.pause_task(handle.task_id, reason="Review intermediate results")
if success:
print("✅ Pause signal sent")
# Check control state
print("\n[Step 4] Checking control state...")
state = client.get_control_state(handle.task_id)
print(f"Is paused: {state.is_paused}")
print(f"Is cancelled: {state.is_cancelled}")
if state.paused_at:
print(f"Paused at: {state.paused_at}")
print(f"Pause reason: {state.pause_reason}")
print(f"Paused by: {state.paused_by}")
# Wait a bit before resuming
print("\n[Step 5] Waiting 3 seconds before resume...")
time.sleep(3)
# Resume the task
print("\n[Step 6] Resuming task...")
success = client.resume_task(handle.task_id, reason="Continue after review")
if success:
print("✅ Resume signal sent")
# Wait for completion
print("\n[Step 7] Waiting for task completion...")
result = client.wait(handle.task_id)
print(f"\n✅ Task completed!")
print(f"Status: {result.status}")
print(f"\nResult preview: {result.result[:200]}...")
print("\n" + "=" * 60)
print("✅ Task control example completed!")
print("=" * 60)
if __name__ == "__main__":
main()
Error Handling for Control Signals
"""Error handling for pause/resume operations"""
from shannon import ShannonClient
from shannon.errors import ShannonError
def main():
client = ShannonClient()
task_id = "task-123"
# Attempt to pause
try:
client.pause_task(task_id, reason="Review needed")
print("✅ Task paused successfully")
except ShannonError as e:
# Handle specific error cases
if "cannot pause completed task" in str(e):
print("⚠️ Task already completed, cannot pause")
elif "task is already paused" in str(e):
print("⚠️ Task is already paused")
elif "Task not found" in str(e):
print("❌ Task not found")
else:
print(f"❌ Error: {e}")
# Attempt to resume
try:
client.resume_task(task_id, reason="Continue execution")
print("✅ Task resumed successfully")
except ShannonError as e:
if "task is not paused" in str(e):
print("⚠️ Task is not paused, cannot resume")
elif "cannot resume completed task" in str(e):
print("⚠️ Task already completed, cannot resume")
else:
print(f"❌ Error: {e}")
if __name__ == "__main__":
main()
Swarm Mode
Force Multi-Agent Execution
"""Swarm mode example with Shannon SDK"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# Submit with swarm mode to coordinate multiple agents
handle = client.submit_task(
query="Research and compare AWS, Azure, and GCP for ML workloads. Cover pricing, GPU availability, and managed services.",
force_swarm=True,
)
result = client.wait(handle.task_id)
print(result.result)
# Check model and usage info
if result.usage:
print(f"Total tokens: {result.usage.get('total_tokens')}")
print(f"Cost: ${result.usage.get('cost_usd', 0):.6f}")
Human-in-the-Loop Review Examples
Interactive Review Workflow
"""Human-in-the-loop review example"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# Assume a workflow that requires review is already running
workflow_id = "workflow-123"
# Check current review state
state = client.get_review_state(workflow_id)
print(f"Review status: {state.status}")
print(f"Round: {state.round}, Version: {state.version}")
if state.current_plan:
print(f"Current plan:\n{state.current_plan}")
# Display conversation history
for r in state.rounds:
print(f" [{r.role}] {r.message}")
# Provide feedback if still in review
if state.status == "reviewing":
updated = client.submit_review_feedback(
workflow_id,
"Please add input validation and unit tests.",
version=state.version,
)
print(f"Feedback submitted. Now at round {updated.round}")
# Approve the updated plan
result = client.approve_review(workflow_id, version=updated.version)
print(f"Approved: {result['status']}")
Review with Optimistic Concurrency
"""Handle version conflicts during review"""
from shannon import ShannonClient, ValidationError
client = ShannonClient()
workflow_id = "workflow-456"
try:
state = client.get_review_state(workflow_id)
updated = client.submit_review_feedback(
workflow_id,
"Looks good, minor style changes needed.",
version=state.version,
)
client.approve_review(workflow_id, version=updated.version)
except ValidationError as e:
if "409" in str(e.code):
print("Version conflict: another user modified the review. Refresh and retry.")
else:
raise
Skills Examples
Browse and Inspect Skills
"""Skills browsing example"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# List all available skills
skills = client.list_skills()
print(f"Found {len(skills)} skills:\n")
for s in skills:
flag = " [DANGEROUS]" if s.dangerous else ""
status = "enabled" if s.enabled else "disabled"
print(f" {s.name} v{s.version} ({s.category}) - {status}{flag}")
print(f" {s.description}")
# Filter by category
print("\n--- Coding Skills ---")
coding = client.list_skills(category="coding")
for s in coding:
print(f" {s.name}: {s.description}")
# Get detailed info
detail = client.get_skill("web_search")
print(f"\nSkill detail: {detail.name} v{detail.version}")
print(f" Author: {detail.author}")
print(f" Requires tools: {detail.requires_tools}")
print(f" Requires role: {detail.requires_role}")
print(f" Budget max: {detail.budget_max}")
# Get version history
versions = client.get_skill_versions("web_search")
print(f"\nVersions of web_search:")
for v in versions:
print(f" v{v.version} - {v.description}")
Schedule Management Examples
Create and Manage Scheduled Tasks
"""Schedule management example"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# Create a daily research task at 9am UTC on weekdays
result = client.create_schedule(
name="Daily AI News",
cron_expression="0 9 * * 1-5",
task_query="Summarize the latest AI research news",
task_context={"force_research": "true", "research_strategy": "quick"},
timezone="UTC",
max_budget_per_run_usd=0.50,
)
schedule_id = result["schedule_id"]
print(f"Schedule created: {schedule_id}")
# List all schedules
schedules, total = client.list_schedules()
print(f"\n{total} schedules:")
for s in schedules:
print(f" {s.name}: {s.cron_expression} ({s.status})")
print(f" Runs: {s.total_runs} total, {s.successful_runs} success, {s.failed_runs} failed")
# Get schedule details
sched = client.get_schedule(schedule_id)
print(f"\nSchedule: {sched.name}")
print(f" Next run: {sched.next_run_at}")
print(f" Last run: {sched.last_run_at}")
# View execution history
runs, _ = client.get_schedule_runs(schedule_id)
for run in runs:
print(f" {run.triggered_at}: {run.status} - ${run.total_cost_usd:.4f}")
# Pause schedule (e.g. for maintenance)
client.pause_schedule(schedule_id, reason="Maintenance window")
# Resume schedule
client.resume_schedule(schedule_id)
# Update schedule configuration
client.update_schedule(
schedule_id,
cron_expression="0 8 * * 1-5", # Change to 8am
max_budget_per_run_usd=0.75,
)
# Delete schedule when no longer needed
# client.delete_schedule(schedule_id)
Direct Tool Execution
List, Inspect, and Execute Tools
"""Direct tool execution example"""
import json
from shannon import ShannonClient
def main():
client = ShannonClient(base_url="http://localhost:8080")
try:
# List available tools
tools = client.list_tools()
print(f"Found {len(tools)} tools")
for tool in tools[:10]:
print(f" - {tool.name}: {tool.description}")
if not tools:
return
# Get detailed info for a specific tool
target = tools[0].name
detail = client.get_tool(target)
print(f"\nTool: {detail.name}")
print(f"Category: {detail.category}")
print(f"Description: {detail.description}")
print(f"Parameters: {json.dumps(detail.parameters, indent=2)}")
# Execute the tool
result = client.execute_tool(target, arguments={"expression": "6 * 7"})
print(f"\nSuccess: {result.success}")
if result.text:
print(f"Text: {result.text}")
if result.output is not None:
print(f"Output: {result.output}")
if result.error:
print(f"Error: {result.error}")
finally:
client.close()
if __name__ == "__main__":
main()
Deterministic Agent Execution
Execute Agents and Send Swarm Follow-ups
"""Deterministic agent execution example"""
import json
from shannon import ShannonClient
def main():
client = ShannonClient(base_url="http://localhost:8080")
try:
# List available agents
agents = client.list_agents()
print(f"Agents: {len(agents)}")
for agent in agents[:10]:
print(f" - {agent.id}: {agent.name} ({agent.tool})")
if not agents:
return
# Get agent details
agent = client.get_agent(agents[0].id)
print(f"\nAgent: {agent.id}")
print(f"Name: {agent.name}")
print(f"Description: {agent.description}")
print(f"Input schema: {json.dumps(agent.input_schema, indent=2)}")
# Execute the agent
execution = client.execute_agent(
agent.id,
{"query": "Analyze recent trends"},
)
print(f"\nTask ID: {execution.task_id}")
print(f"Workflow ID: {execution.workflow_id}")
print(f"Status: {execution.status}")
# Wait for result
final = execution.wait(timeout=120)
print(f"\nResult: {final.result}")
# Optional: send follow-up message to a swarm workflow
# result = client.send_swarm_message(execution.workflow_id, "Focus on cost analysis")
# print(f"Swarm follow-up accepted: {result.success}")
finally:
client.close()
if __name__ == "__main__":
main()
OpenAI-Compatible Chat Completions
Non-Streaming and Streaming
"""OpenAI-compatible chat completions example"""
from shannon import ShannonClient, OpenAIChatMessage
def main():
client = ShannonClient(base_url="http://localhost:8080")
try:
# List available models
models = client.list_openai_models()
print(f"OpenAI-compatible models: {len(models)}")
for entry in models[:10]:
print(f" - {entry.id}")
messages = [OpenAIChatMessage(role="user", content="Summarize Shannon's strengths in two sentences.")]
# Non-streaming completion
completion = client.create_chat_completion(
messages,
model="shannon-chat",
)
print(f"\nResponse: {completion.choices[0].message.content}")
if completion.usage:
print(f"Tokens: {completion.usage.total_tokens}")
# Streaming completion
print("\nStreaming response:")
for chunk in client.stream_chat_completion(
[OpenAIChatMessage(role="user", content="Tell me a story in 3 sentences.")],
model="shannon-chat",
include_usage=True,
):
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
finally:
client.close()
if __name__ == "__main__":
main()
Session Files and Memory
List and Download Workspace and Memory Files
"""Session files and memory example"""
from shannon import ShannonClient
def main():
client = ShannonClient(base_url="http://localhost:8080")
session_id = "my-session"
try:
# List memory files
memory_files = client.list_memory_files()
print(f"Memory files: {len(memory_files)}")
for entry in memory_files[:10]:
print(f" - {entry.path} ({entry.size_bytes} bytes)")
# Download a memory file
if memory_files:
downloaded = client.download_memory_file(memory_files[0].path)
print(f"\nMemory file content preview:")
print(downloaded.content[:400])
# List session workspace files
session_files = client.list_session_files(session_id)
print(f"\nSession files for {session_id}: {len(session_files)}")
for entry in session_files[:10]:
print(f" - {entry.path} ({entry.size_bytes} bytes)")
# Download a session file
if session_files:
downloaded = client.download_session_file(session_id, session_files[0].path)
print(f"\nSession file content preview:")
print(downloaded.content[:400])
finally:
client.close()
if __name__ == "__main__":
main()
Error Handling Examples
See the dedicated page for complete patterns and a full retry-logic example: Error Handling.Next Steps
Async Usage
Async/await patterns
Error Handling
Handle errors properly