Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.shannon.run/llms.txt

Use this file to discover all available pages before exploring further.

This examples collection is growing. More use cases will be added regularly.

Basic Examples

Model Selection

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
  "Summarize the paragraph into three bullet points focusing on revenue trends. Output: Markdown list.",
  model_tier="small",
  # model_override="gpt-5-nano-2025-08-07",
  # provider_override="openai",
  mode="simple",
)
print(client.wait(handle.task_id).result)

Simple Question Answering

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Ask a simple question
handle = client.submit_task(query="What is the capital of France?")
final = client.wait(handle.task_id)
print(f"Answer: {final.result}")

Data Analysis

from shannon import ShannonClient

client = ShannonClient()

# Realistic data analysis
handle = client.submit_task(
    query=(
        "Given the sales data below, calculate:\n"
        "1. Total revenue per product\n"
        "2. Month-over-month growth percentage\n"
        "3. Return top 3 by total revenue\n\n"
        "Data:\n"
        "ProductA: Jan=$10000, Feb=$12000\n"
        "ProductB: Jan=$8000, Feb=$9500\n"
        "ProductC: Jan=$15000, Feb=$14000\n"
        "ProductD: Jan=$5000, Feb=$6000\n\n"
        "Format: JSON array [{product, revenue, mom_growth_pct}]"
    )
)

result = client.wait(handle.task_id)
print(result.result)

Advanced Examples

Multi-Step Workflow

"""Multi-step workflow example with Shannon SDK"""
from shannon import ShannonClient

def main():
    client = ShannonClient()
    session_id = "quarterly-analysis-demo"

    print("=" * 60)
    print("Multi-Step Workflow Example")
    print("=" * 60)

    # Step 1: Initial data query
    print("\n[Step 1] Loading Q4 data...")
    h1 = client.submit_task(
        query="What is 1000 + 500? This represents Q4 revenue (1000) and expenses (500).",
        session_id=session_id
    )
    result1 = client.wait(h1.task_id)
    print(f"Result: {result1.result}")

    # Step 2: Analysis based on previous context
    print("\n[Step 2] Analyzing trends...")
    h2 = client.submit_task(
        query="Based on the Q4 numbers you just calculated, what is the profit margin percentage?",
        session_id=session_id
    )
    result2 = client.wait(h2.task_id)
    print(f"Result: {result2.result}")

    # Step 3: Summary using all previous context
    print("\n[Step 3] Creating executive summary...")
    h3 = client.submit_task(
        query="Summarize the Q4 financial analysis in 2 sentences.",
        session_id=session_id
    )
    result3 = client.wait(h3.task_id)
    print(f"Result: {result3.result}")

    print("\n" + "=" * 60)
    print("✅ Multi-step workflow completed!")
    print(f"Session ID: {session_id}")
    print("=" * 60)

if __name__ == "__main__":
    main()

Parallel Processing

"""Parallel processing example with Shannon SDK"""
import asyncio
from shannon import AsyncShannonClient

async def main():
    print("=" * 60)
    print("Parallel Processing Example")
    print("=" * 60)

    async with AsyncShannonClient() as client:
        # Topics to process in parallel
        topics = ["AI", "Quantum Computing", "Biotechnology"]

        print(f"\n[Step 1] Submitting {len(topics)} tasks in parallel...")

        # Submit all tasks concurrently (non-blocking)
        tasks = [
            client.submit_task(
                query=f"Give a one-sentence summary of {topic} and its main application."
            )
            for topic in topics
        ]

        # Wait for all submissions to complete
        handles = await asyncio.gather(*tasks)
        print(f"✅ All {len(handles)} tasks submitted")

        print("\n[Step 2] Waiting for all results...")

        # Wait for all tasks to complete in parallel
        results = await asyncio.gather(
            *[client.wait(h.task_id) for h in handles]
        )
        print(f"✅ All {len(results)} tasks completed")

        print("\n[Step 3] Results:")
        print("-" * 60)

        # Display results
        for topic, result in zip(topics, results):
            print(f"\n📌 {topic}:")
            print(f"   {result.result}")

            # Safely access metadata if available
            metadata = []
            if hasattr(result, 'metadata') and result.metadata:
                model = result.metadata.get('model_used') or result.metadata.get('model')
                if model:
                    metadata.append(f"Model: {model}")
                tokens = result.metadata.get('total_tokens')
                if tokens:
                    metadata.append(f"Tokens: {tokens}")

            if metadata:
                print(f"   ({', '.join(metadata)})")

        print("\n" + "=" * 60)
        print("✅ Parallel processing completed!")
        print(f"Processed {len(topics)} topics concurrently")
        print("=" * 60)

if __name__ == "__main__":
    asyncio.run(main())

Streaming with filters

"""Streaming with event filters example"""
from shannon import ShannonClient, EventType

def main():
    print("=" * 60)
    print("Streaming with Filters Example")
    print("=" * 60)

    client = ShannonClient()

    print("\n[Step 1] Submitting task...")
    handle = client.submit_task(
        query=(
            "Compare AWS, Azure, and GCP pricing models in 3 sentences. "
            "Stream the output as you generate it."
        )
    )
    print(f"✅ Task submitted: {handle.task_id}")

    print("\n[Step 2] Streaming with filters [LLM_OUTPUT, WORKFLOW_COMPLETED]...")
    print("-" * 60)

    event_count = 0
    llm_outputs = []

    # Stream only LLM_OUTPUT and WORKFLOW_COMPLETED events
    for event in client.stream(
        handle.workflow_id,
        types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
    ):
        event_count += 1
        print(f"[{event_count}] {event.type}")

        if event.type == EventType.LLM_OUTPUT:
            print(f"    Content: {event.message[:80]}...")
            llm_outputs.append(event.message)

        if event.type == EventType.WORKFLOW_COMPLETED:
            print(f"    Status: {event.message}")
            break

    print("\n" + "-" * 60)
    print(f"✅ Received {event_count} filtered events")
    print(f"✅ Captured {len(llm_outputs)} LLM outputs")

    print("\n[Step 3] Full result:")
    print("-" * 60)
    final = client.wait(handle.task_id)
    print(final.result)

    print("\n" + "=" * 60)
    print("✅ Streaming with filters completed!")
    print("=" * 60)

if __name__ == "__main__":
    main()
See the full list of event types in the Event Types Catalog.
Available event types
  • LLM_OUTPUT — LLM responses
  • LLM_PARTIAL — streaming tokens
  • WORKFLOW_COMPLETED — task finished
  • AGENT_THINKING — agent reasoning
  • AGENT_STARTED — agent begins work
  • AGENT_COMPLETED — agent finishes
  • PROGRESS — progress updates
  • ERROR_OCCURRED — errors
Common filters
  • Only final outputs: types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
  • Real-time streaming: types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED]
  • Monitor agents: types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED]
  • Everything (no filter): types=None (or omit the parameter)

Task Control Examples

Pause and Resume Long-Running Tasks

"""Task control (pause/resume) example"""
from shannon import ShannonClient
import time

def main():
    print("=" * 60)
    print("Task Control (Pause/Resume) Example")
    print("=" * 60)

    client = ShannonClient(base_url="http://localhost:8080")

    # Submit a long-running research task
    print("\n[Step 1] Submitting long-running task...")
    handle = client.submit_task(
        query="Research the latest developments in quantum computing and AI integration. Provide detailed analysis."
    )
    print(f"✅ Task submitted: {handle.task_id}")

    # Let it run for a bit
    print("\n[Step 2] Letting task run for 5 seconds...")
    time.sleep(5)

    # Pause the task
    print("\n[Step 3] Pausing task...")
    success = client.pause_task(handle.task_id, reason="Review intermediate results")
    if success:
        print("✅ Pause signal sent")

    # Check control state
    print("\n[Step 4] Checking control state...")
    state = client.get_control_state(handle.task_id)
    print(f"Is paused: {state.is_paused}")
    print(f"Is cancelled: {state.is_cancelled}")
    if state.paused_at:
        print(f"Paused at: {state.paused_at}")
        print(f"Pause reason: {state.pause_reason}")
        print(f"Paused by: {state.paused_by}")

    # Wait a bit before resuming
    print("\n[Step 5] Waiting 3 seconds before resume...")
    time.sleep(3)

    # Resume the task
    print("\n[Step 6] Resuming task...")
    success = client.resume_task(handle.task_id, reason="Continue after review")
    if success:
        print("✅ Resume signal sent")

    # Wait for completion
    print("\n[Step 7] Waiting for task completion...")
    result = client.wait(handle.task_id)
    print(f"\n✅ Task completed!")
    print(f"Status: {result.status}")
    print(f"\nResult preview: {result.result[:200]}...")

    print("\n" + "=" * 60)
    print("✅ Task control example completed!")
    print("=" * 60)

if __name__ == "__main__":
    main()

Error Handling for Control Signals

"""Error handling for pause/resume operations"""
from shannon import ShannonClient
from shannon.errors import ShannonError

def main():
    client = ShannonClient()

    task_id = "task-123"

    # Attempt to pause
    try:
        client.pause_task(task_id, reason="Review needed")
        print("✅ Task paused successfully")
    except ShannonError as e:
        # Handle specific error cases
        if "cannot pause completed task" in str(e):
            print("⚠️  Task already completed, cannot pause")
        elif "task is already paused" in str(e):
            print("⚠️  Task is already paused")
        elif "Task not found" in str(e):
            print("❌ Task not found")
        else:
            print(f"❌ Error: {e}")

    # Attempt to resume
    try:
        client.resume_task(task_id, reason="Continue execution")
        print("✅ Task resumed successfully")
    except ShannonError as e:
        if "task is not paused" in str(e):
            print("⚠️  Task is not paused, cannot resume")
        elif "cannot resume completed task" in str(e):
            print("⚠️  Task already completed, cannot resume")
        else:
            print(f"❌ Error: {e}")

if __name__ == "__main__":
    main()

Swarm Mode

Force Multi-Agent Execution

"""Swarm mode example with Shannon SDK"""
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Submit with swarm mode to coordinate multiple agents
handle = client.submit_task(
    query="Research and compare AWS, Azure, and GCP for ML workloads. Cover pricing, GPU availability, and managed services.",
    force_swarm=True,
)

result = client.wait(handle.task_id)
print(result.result)

# Check model and usage info
if result.usage:
    print(f"Total tokens: {result.usage.get('total_tokens')}")
    print(f"Cost: ${result.usage.get('cost_usd', 0):.6f}")

Human-in-the-Loop Review Examples

Interactive Review Workflow

"""Human-in-the-loop review example"""
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Assume a workflow that requires review is already running
workflow_id = "workflow-123"

# Check current review state
state = client.get_review_state(workflow_id)
print(f"Review status: {state.status}")
print(f"Round: {state.round}, Version: {state.version}")

if state.current_plan:
    print(f"Current plan:\n{state.current_plan}")

# Display conversation history
for r in state.rounds:
    print(f"  [{r.role}] {r.message}")

# Provide feedback if still in review
if state.status == "reviewing":
    updated = client.submit_review_feedback(
        workflow_id,
        "Please add input validation and unit tests.",
        version=state.version,
    )
    print(f"Feedback submitted. Now at round {updated.round}")

    # Approve the updated plan
    result = client.approve_review(workflow_id, version=updated.version)
    print(f"Approved: {result['status']}")

Review with Optimistic Concurrency

"""Handle version conflicts during review"""
from shannon import ShannonClient, ValidationError

client = ShannonClient()

workflow_id = "workflow-456"

try:
    state = client.get_review_state(workflow_id)
    updated = client.submit_review_feedback(
        workflow_id,
        "Looks good, minor style changes needed.",
        version=state.version,
    )
    client.approve_review(workflow_id, version=updated.version)
except ValidationError as e:
    if "409" in str(e.code):
        print("Version conflict: another user modified the review. Refresh and retry.")
    else:
        raise

Skills Examples

Browse and Inspect Skills

"""Skills browsing example"""
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# List all available skills
skills = client.list_skills()
print(f"Found {len(skills)} skills:\n")
for s in skills:
    flag = " [DANGEROUS]" if s.dangerous else ""
    status = "enabled" if s.enabled else "disabled"
    print(f"  {s.name} v{s.version} ({s.category}) - {status}{flag}")
    print(f"    {s.description}")

# Filter by category
print("\n--- Coding Skills ---")
coding = client.list_skills(category="coding")
for s in coding:
    print(f"  {s.name}: {s.description}")

# Get detailed info
detail = client.get_skill("web_search")
print(f"\nSkill detail: {detail.name} v{detail.version}")
print(f"  Author: {detail.author}")
print(f"  Requires tools: {detail.requires_tools}")
print(f"  Requires role: {detail.requires_role}")
print(f"  Budget max: {detail.budget_max}")

# Get version history
versions = client.get_skill_versions("web_search")
print(f"\nVersions of web_search:")
for v in versions:
    print(f"  v{v.version} - {v.description}")

Schedule Management Examples

Create and Manage Scheduled Tasks

"""Schedule management example"""
from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Create a daily research task at 9am UTC on weekdays
result = client.create_schedule(
    name="Daily AI News",
    cron_expression="0 9 * * 1-5",
    task_query="Summarize the latest AI research news",
    task_context={"force_research": "true", "research_strategy": "quick"},
    timezone="UTC",
    max_budget_per_run_usd=0.50,
)
schedule_id = result["schedule_id"]
print(f"Schedule created: {schedule_id}")

# List all schedules
schedules, total = client.list_schedules()
print(f"\n{total} schedules:")
for s in schedules:
    print(f"  {s.name}: {s.cron_expression} ({s.status})")
    print(f"    Runs: {s.total_runs} total, {s.successful_runs} success, {s.failed_runs} failed")

# Get schedule details
sched = client.get_schedule(schedule_id)
print(f"\nSchedule: {sched.name}")
print(f"  Next run: {sched.next_run_at}")
print(f"  Last run: {sched.last_run_at}")

# View execution history
runs, _ = client.get_schedule_runs(schedule_id)
for run in runs:
    print(f"  {run.triggered_at}: {run.status} - ${run.total_cost_usd:.4f}")

# Pause schedule (e.g. for maintenance)
client.pause_schedule(schedule_id, reason="Maintenance window")

# Resume schedule
client.resume_schedule(schedule_id)

# Update schedule configuration
client.update_schedule(
    schedule_id,
    cron_expression="0 8 * * 1-5",  # Change to 8am
    max_budget_per_run_usd=0.75,
)

# Delete schedule when no longer needed
# client.delete_schedule(schedule_id)

Direct Tool Execution

List, Inspect, and Execute Tools

"""Direct tool execution example"""
import json
from shannon import ShannonClient

def main():
    client = ShannonClient(base_url="http://localhost:8080")
    try:
        # List available tools
        tools = client.list_tools()
        print(f"Found {len(tools)} tools")
        for tool in tools[:10]:
            print(f"  - {tool.name}: {tool.description}")

        if not tools:
            return

        # Get detailed info for a specific tool
        target = tools[0].name
        detail = client.get_tool(target)
        print(f"\nTool: {detail.name}")
        print(f"Category: {detail.category}")
        print(f"Description: {detail.description}")
        print(f"Parameters: {json.dumps(detail.parameters, indent=2)}")

        # Execute the tool
        result = client.execute_tool(target, arguments={"expression": "6 * 7"})
        print(f"\nSuccess: {result.success}")
        if result.text:
            print(f"Text: {result.text}")
        if result.output is not None:
            print(f"Output: {result.output}")
        if result.error:
            print(f"Error: {result.error}")
    finally:
        client.close()

if __name__ == "__main__":
    main()

Deterministic Agent Execution

Execute Agents and Send Swarm Follow-ups

"""Deterministic agent execution example"""
import json
from shannon import ShannonClient

def main():
    client = ShannonClient(base_url="http://localhost:8080")
    try:
        # List available agents
        agents = client.list_agents()
        print(f"Agents: {len(agents)}")
        for agent in agents[:10]:
            print(f"  - {agent.id}: {agent.name} ({agent.tool})")

        if not agents:
            return

        # Get agent details
        agent = client.get_agent(agents[0].id)
        print(f"\nAgent: {agent.id}")
        print(f"Name: {agent.name}")
        print(f"Description: {agent.description}")
        print(f"Input schema: {json.dumps(agent.input_schema, indent=2)}")

        # Execute the agent
        execution = client.execute_agent(
            agent.id,
            {"query": "Analyze recent trends"},
        )
        print(f"\nTask ID: {execution.task_id}")
        print(f"Workflow ID: {execution.workflow_id}")
        print(f"Status: {execution.status}")

        # Wait for result
        final = execution.wait(timeout=120)
        print(f"\nResult: {final.result}")

        # Optional: send follow-up message to a swarm workflow
        # result = client.send_swarm_message(execution.workflow_id, "Focus on cost analysis")
        # print(f"Swarm follow-up accepted: {result.success}")
    finally:
        client.close()

if __name__ == "__main__":
    main()

OpenAI-Compatible Chat Completions

Non-Streaming and Streaming

"""OpenAI-compatible chat completions example"""
from shannon import ShannonClient, OpenAIChatMessage

def main():
    client = ShannonClient(base_url="http://localhost:8080")
    try:
        # List available models
        models = client.list_openai_models()
        print(f"OpenAI-compatible models: {len(models)}")
        for entry in models[:10]:
            print(f"  - {entry.id}")

        messages = [OpenAIChatMessage(role="user", content="Summarize Shannon's strengths in two sentences.")]

        # Non-streaming completion
        completion = client.create_chat_completion(
            messages,
            model="shannon-chat",
        )
        print(f"\nResponse: {completion.choices[0].message.content}")
        if completion.usage:
            print(f"Tokens: {completion.usage.total_tokens}")

        # Streaming completion
        print("\nStreaming response:")
        for chunk in client.stream_chat_completion(
            [OpenAIChatMessage(role="user", content="Tell me a story in 3 sentences.")],
            model="shannon-chat",
            include_usage=True,
        ):
            if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end="", flush=True)
        print()
    finally:
        client.close()

if __name__ == "__main__":
    main()

Session Files and Memory

List and Download Workspace and Memory Files

"""Session files and memory example"""
from shannon import ShannonClient

def main():
    client = ShannonClient(base_url="http://localhost:8080")
    session_id = "my-session"

    try:
        # List memory files
        memory_files = client.list_memory_files()
        print(f"Memory files: {len(memory_files)}")
        for entry in memory_files[:10]:
            print(f"  - {entry.path} ({entry.size_bytes} bytes)")

        # Download a memory file
        if memory_files:
            downloaded = client.download_memory_file(memory_files[0].path)
            print(f"\nMemory file content preview:")
            print(downloaded.content[:400])

        # List session workspace files
        session_files = client.list_session_files(session_id)
        print(f"\nSession files for {session_id}: {len(session_files)}")
        for entry in session_files[:10]:
            print(f"  - {entry.path} ({entry.size_bytes} bytes)")

        # Download a session file
        if session_files:
            downloaded = client.download_session_file(session_id, session_files[0].path)
            print(f"\nSession file content preview:")
            print(downloaded.content[:400])
    finally:
        client.close()

if __name__ == "__main__":
    main()

Error Handling Examples

See the dedicated page for complete patterns and a full retry-logic example: Error Handling.

Next Steps

Async Usage

Async/await patterns

Error Handling

Handle errors properly