Skip to main content
This examples collection is growing. More use cases will be added regularly.

Basic Examples

Model Selection

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
  "Summarize the paragraph into three bullet points focusing on revenue trends. Output: Markdown list.",
  model_tier="small",
  # model_override="gpt-5-nano-2025-08-07",
  # provider_override="openai",
  mode="simple",
)
print(client.wait(handle.task_id).result)

Simple Question Answering

from shannon import ShannonClient

client = ShannonClient(base_url="http://localhost:8080")

# Ask a simple question
handle = client.submit_task(query="What is the capital of France?")
final = client.wait(handle.task_id)
print(f"Answer: {final.result}")

Data Analysis

from shannon import ShannonClient

client = ShannonClient()

# Realistic data analysis
handle = client.submit_task(
    query=(
        "Given the sales data below, calculate:\n"
        "1. Total revenue per product\n"
        "2. Month-over-month growth percentage\n"
        "3. Return top 3 by total revenue\n\n"
        "Data:\n"
        "ProductA: Jan=$10000, Feb=$12000\n"
        "ProductB: Jan=$8000, Feb=$9500\n"
        "ProductC: Jan=$15000, Feb=$14000\n"
        "ProductD: Jan=$5000, Feb=$6000\n\n"
        "Format: JSON array [{product, revenue, mom_growth_pct}]"
    )
)

result = client.wait(handle.task_id)
print(result.result)

Advanced Examples

Multi-Step Workflow

"""Multi-step workflow example with Shannon SDK"""
from shannon import ShannonClient

def main():
    client = ShannonClient()
    session_id = "quarterly-analysis-demo"

    print("=" * 60)
    print("Multi-Step Workflow Example")
    print("=" * 60)

    # Step 1: Initial data query
    print("\n[Step 1] Loading Q4 data...")
    h1 = client.submit_task(
        query="What is 1000 + 500? This represents Q4 revenue (1000) and expenses (500).",
        session_id=session_id
    )
    result1 = client.wait(h1.task_id)
    print(f"Result: {result1.result}")

    # Step 2: Analysis based on previous context
    print("\n[Step 2] Analyzing trends...")
    h2 = client.submit_task(
        query="Based on the Q4 numbers you just calculated, what is the profit margin percentage?",
        session_id=session_id
    )
    result2 = client.wait(h2.task_id)
    print(f"Result: {result2.result}")

    # Step 3: Summary using all previous context
    print("\n[Step 3] Creating executive summary...")
    h3 = client.submit_task(
        query="Summarize the Q4 financial analysis in 2 sentences.",
        session_id=session_id
    )
    result3 = client.wait(h3.task_id)
    print(f"Result: {result3.result}")

    print("\n" + "=" * 60)
    print("✅ Multi-step workflow completed!")
    print(f"Session ID: {session_id}")
    print("=" * 60)

if __name__ == "__main__":
    main()

Parallel Processing

"""Parallel processing example with Shannon SDK"""
import asyncio
from shannon import AsyncShannonClient

async def main():
    print("=" * 60)
    print("Parallel Processing Example")
    print("=" * 60)

    async with AsyncShannonClient() as client:
        # Topics to process in parallel
        topics = ["AI", "Quantum Computing", "Biotechnology"]

        print(f"\n[Step 1] Submitting {len(topics)} tasks in parallel...")

        # Submit all tasks concurrently (non-blocking)
        tasks = [
            client.submit_task(
                query=f"Give a one-sentence summary of {topic} and its main application."
            )
            for topic in topics
        ]

        # Wait for all submissions to complete
        handles = await asyncio.gather(*tasks)
        print(f"✅ All {len(handles)} tasks submitted")

        print("\n[Step 2] Waiting for all results...")

        # Wait for all tasks to complete in parallel
        results = await asyncio.gather(
            *[client.wait(h.task_id) for h in handles]
        )
        print(f"✅ All {len(results)} tasks completed")

        print("\n[Step 3] Results:")
        print("-" * 60)

        # Display results
        for topic, result in zip(topics, results):
            print(f"\n📌 {topic}:")
            print(f"   {result.result}")

            # Safely access metadata if available
            metadata = []
            if hasattr(result, 'metadata') and result.metadata:
                model = result.metadata.get('model_used') or result.metadata.get('model')
                if model:
                    metadata.append(f"Model: {model}")
                tokens = result.metadata.get('total_tokens')
                if tokens:
                    metadata.append(f"Tokens: {tokens}")

            if metadata:
                print(f"   ({', '.join(metadata)})")

        print("\n" + "=" * 60)
        print("✅ Parallel processing completed!")
        print(f"Processed {len(topics)} topics concurrently")
        print("=" * 60)

if __name__ == "__main__":
    asyncio.run(main())

Streaming with filters

"""Streaming with event filters example"""
from shannon import ShannonClient, EventType

def main():
    print("=" * 60)
    print("Streaming with Filters Example")
    print("=" * 60)

    client = ShannonClient()

    print("\n[Step 1] Submitting task...")
    handle = client.submit_task(
        query=(
            "Compare AWS, Azure, and GCP pricing models in 3 sentences. "
            "Stream the output as you generate it."
        )
    )
    print(f"✅ Task submitted: {handle.task_id}")

    print("\n[Step 2] Streaming with filters [LLM_OUTPUT, WORKFLOW_COMPLETED]...")
    print("-" * 60)

    event_count = 0
    llm_outputs = []

    # Stream only LLM_OUTPUT and WORKFLOW_COMPLETED events
    for event in client.stream(
        handle.workflow_id,
        types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
    ):
        event_count += 1
        print(f"[{event_count}] {event.type}")

        if event.type == EventType.LLM_OUTPUT:
            print(f"    Content: {event.message[:80]}...")
            llm_outputs.append(event.message)

        if event.type == EventType.WORKFLOW_COMPLETED:
            print(f"    Status: {event.message}")
            break

    print("\n" + "-" * 60)
    print(f"✅ Received {event_count} filtered events")
    print(f"✅ Captured {len(llm_outputs)} LLM outputs")

    print("\n[Step 3] Full result:")
    print("-" * 60)
    final = client.wait(handle.task_id)
    print(final.result)

    print("\n" + "=" * 60)
    print("✅ Streaming with filters completed!")
    print("=" * 60)

if __name__ == "__main__":
    main()
See the full list of event types in the Event Types Catalog.
Available event types
  • LLM_OUTPUT — LLM responses
  • LLM_PARTIAL — streaming tokens
  • WORKFLOW_COMPLETED — task finished
  • AGENT_THINKING — agent reasoning
  • AGENT_STARTED — agent begins work
  • AGENT_COMPLETED — agent finishes
  • PROGRESS — progress updates
  • ERROR_OCCURRED — errors
Common filters
  • Only final outputs: types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
  • Real-time streaming: types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED]
  • Monitor agents: types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED]
  • Everything (no filter): types=None (or omit the parameter)

Task Control Examples

Pause and Resume Long-Running Tasks

"""Task control (pause/resume) example"""
from shannon import ShannonClient
import time

def main():
    print("=" * 60)
    print("Task Control (Pause/Resume) Example")
    print("=" * 60)

    client = ShannonClient(base_url="http://localhost:8080")

    # Submit a long-running research task
    print("\n[Step 1] Submitting long-running task...")
    handle = client.submit_task(
        query="Research the latest developments in quantum computing and AI integration. Provide detailed analysis."
    )
    print(f"✅ Task submitted: {handle.task_id}")

    # Let it run for a bit
    print("\n[Step 2] Letting task run for 5 seconds...")
    time.sleep(5)

    # Pause the task
    print("\n[Step 3] Pausing task...")
    success = client.pause_task(handle.task_id, reason="Review intermediate results")
    if success:
        print("✅ Pause signal sent")

    # Check control state
    print("\n[Step 4] Checking control state...")
    state = client.get_control_state(handle.task_id)
    print(f"Is paused: {state.is_paused}")
    print(f"Is cancelled: {state.is_cancelled}")
    if state.paused_at:
        print(f"Paused at: {state.paused_at}")
        print(f"Pause reason: {state.pause_reason}")
        print(f"Paused by: {state.paused_by}")

    # Wait a bit before resuming
    print("\n[Step 5] Waiting 3 seconds before resume...")
    time.sleep(3)

    # Resume the task
    print("\n[Step 6] Resuming task...")
    success = client.resume_task(handle.task_id, reason="Continue after review")
    if success:
        print("✅ Resume signal sent")

    # Wait for completion
    print("\n[Step 7] Waiting for task completion...")
    result = client.wait(handle.task_id)
    print(f"\n✅ Task completed!")
    print(f"Status: {result.status}")
    print(f"\nResult preview: {result.result[:200]}...")

    print("\n" + "=" * 60)
    print("✅ Task control example completed!")
    print("=" * 60)

if __name__ == "__main__":
    main()

Error Handling for Control Signals

"""Error handling for pause/resume operations"""
from shannon import ShannonClient
from shannon.errors import APIError

def main():
    client = ShannonClient()

    task_id = "task-123"

    # Attempt to pause
    try:
        client.pause_task(task_id, reason="Review needed")
        print("✅ Task paused successfully")
    except APIError as e:
        # Handle specific error cases
        if "cannot pause completed task" in str(e):
            print("⚠️  Task already completed, cannot pause")
        elif "task is already paused" in str(e):
            print("⚠️  Task is already paused")
        elif "Task not found" in str(e):
            print("❌ Task not found")
        else:
            print(f"❌ Error: {e}")

    # Attempt to resume
    try:
        client.resume_task(task_id, reason="Continue execution")
        print("✅ Task resumed successfully")
    except APIError as e:
        if "task is not paused" in str(e):
            print("⚠️  Task is not paused, cannot resume")
        elif "cannot resume completed task" in str(e):
            print("⚠️  Task already completed, cannot resume")
        else:
            print(f"❌ Error: {e}")

if __name__ == "__main__":
    main()

Error Handling Examples

See the dedicated page for complete patterns and a full retry-logic example: Error Handling.

Next Steps