この例のコレクションは増加しています。今後も定期的に使用例が追加されます。
基本的な例
モデル選択
コピー
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
"収益トレンドに焦点を当てて段落を3つの箇条書きに要約してください。出力形式: Markdownリスト。",
model_tier="small",
# model_override="gpt-5-nano-2025-08-07",
# provider_override="openai",
mode="simple",
)
print(client.wait(handle.task_id).result)
簡単な質問応答
コピー
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 簡単な質問をする
handle = client.submit_task(query="フランスの首都はどこですか?")
final = client.wait(handle.task_id)
print(f"回答: {final.result}")
データ分析
コピー
from shannon import ShannonClient
client = ShannonClient()
# 現実的なデータ分析
handle = client.submit_task(
query=(
"以下の売上データに基づいて計算してください:\n"
"1. 製品ごとの総収益\n"
"2. 月ごとの成長率\n"
"3. 総収益が最も高い上位3つを返す\n\n"
"データ:\n"
"ProductA: 1月=$10000, 2月=$12000\n"
"ProductB: 1月=$8000, 2月=$9500\n"
"ProductC: 1月=$15000, 2月=$14000\n"
"ProductD: 1月=$5000, 2月=$6000\n\n"
"フォーマット: JSON配列 [{product, revenue, mom_growth_pct}]"
)
)
result = client.wait(handle.task_id)
print(result.result)
高度な例
マルチステップワークフロー
コピー
"""Shannon SDKを使用したマルチステップワークフローの例"""
from shannon import ShannonClient
def main():
client = ShannonClient()
session_id = "quarterly-analysis-demo"
print("=" * 60)
print("マルチステップワークフローの例")
print("=" * 60)
# ステップ1: 初期データクエリ
print("\n[ステップ 1] Q4データを読み込み中...")
h1 = client.submit_task(
query="1000 + 500 はいくつですか?これはQ4の収益(1000)と費用(500)を表します。",
session_id=session_id
)
result1 = client.wait(h1.task_id)
print(f"結果: {result1.result}")
# ステップ2: 前のコンテキストに基づく分析
print("\n[ステップ 2] トレンドを分析中...")
h2 = client.submit_task(
query="あなたが計算したQ4の数字に基づいて、利益率は何パーセントですか?",
session_id=session_id
)
result2 = client.wait(h2.task_id)
print(f"結果: {result2.result}")
# ステップ3: すべての前のコンテキストを使用した要約
print("\n[ステップ 3] エグゼクティブサマリーを作成中...")
h3 = client.submit_task(
query="Q4の財務分析を2文で要約してください。",
session_id=session_id
)
result3 = client.wait(h3.task_id)
print(f"結果: {result3.result}")
print("\n" + "=" * 60)
print("✅ マルチステップワークフローが完了しました!")
print(f"セッションID: {session_id}")
print("=" * 60)
if __name__ == "__main__":
main()
並列処理
コピー
"""Shannon SDKを使用した並列処理の例"""
import asyncio
from shannon import AsyncShannonClient
async def main():
print("=" * 60)
print("並列処理の例")
print("=" * 60)
async with AsyncShannonClient() as client:
# 並列処理するトピック
topics = ["AI", "量子コンピューティング", "バイオテクノロジー"]
print(f"\n[ステップ 1] {len(topics)} タスクを並列で提出中...")
# すべてのタスクを同時に提出(ノンブロッキング)
tasks = [
client.submit_task(
query=f"{topic}の1文要約とその主な応用を教えてください。"
)
for topic in topics
]
# すべての提出が完了するのを待つ
handles = await asyncio.gather(*tasks)
print(f"✅ {len(handles)} タスクがすべて提出されました")
print("\n[ステップ 2] すべての結果を待機中...")
# すべてのタスクが並列で完了するのを待つ
results = await asyncio.gather(
*[client.wait(h.task_id) for h in handles]
)
print(f"✅ {len(results)} タスクがすべて完了しました")
print("\n[ステップ 3] 結果:")
print("-" * 60)
# 結果を表示
for topic, result in zip(topics, results):
print(f"\n📌 {topic}:")
print(f" {result.result}")
# メタデータが利用可能な場合は安全にアクセス
metadata = []
if hasattr(result, 'metadata') and result.metadata:
model = result.metadata.get('model_used') or result.metadata.get('model')
if model:
metadata.append(f"モデル: {model}")
tokens = result.metadata.get('total_tokens')
if tokens:
metadata.append(f"トークン: {tokens}")
if metadata:
print(f" ({', '.join(metadata)})")
print("\n" + "=" * 60)
print("✅ 並列処理が完了しました!")
print(f"{len(topics)} トピックを同時に処理しました")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
フィルターを使用したストリーミング
コピー
"""フィルター付きストリーミングの例"""
from shannon import ShannonClient, EventType
def main():
print("=" * 60)
print("フィルターを使用したストリーミングの例")
print("=" * 60)
client = ShannonClient()
print("\n[ステップ 1] タスクを送信中...")
handle = client.submit_task(
query=(
"AWS、Azure、GCPの価格モデルを3文で比較してください。 "
"生成される出力をストリームします。"
)
)
print(f"✅ タスクが送信されました: {handle.task_id}")
print("\n[ステップ 2] フィルターを使用したストリーミング [LLM_OUTPUT, WORKFLOW_COMPLETED]...")
print("-" * 60)
event_count = 0
llm_outputs = []
# LLM_OUTPUT と WORKFLOW_COMPLETED イベントのみをストリーム
for event in client.stream(
handle.workflow_id,
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
):
event_count += 1
print(f"[{event_count}] {event.type}")
if event.type == EventType.LLM_OUTPUT:
print(f" コンテンツ: {event.message[:80]}...")
llm_outputs.append(event.message)
if event.type == EventType.WORKFLOW_COMPLETED:
print(f" ステータス: {event.message}")
break
print("\n" + "-" * 60)
print(f"✅ {event_count} 件のフィルタリングされたイベントを受信")
print(f"✅ {len(llm_outputs)} 件の LLM 出力をキャプチャ")
print("\n[ステップ 3] 完全な結果:")
print("-" * 60)
final = client.wait(handle.task_id)
print(final.result)
print("\n" + "=" * 60)
print("✅ フィルターを使用したストリーミングが完了しました!")
print("=" * 60)
if __name__ == "__main__":
main()
利用可能なイベントタイプ
LLM_OUTPUT— LLM の応答LLM_PARTIAL— ストリーミングトークンWORKFLOW_COMPLETED— タスク完了AGENT_THINKING— エージェントの推論AGENT_STARTED— エージェントが作業を開始AGENT_COMPLETED— エージェントが完了PROGRESS— 進捗更新ERROR_OCCURRED— エラー
- 最終出力のみ:
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED] - リアルタイムストリーミング:
types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED] - エージェントの監視:
types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED] - すべて (フィルターなし):
types=None(またはパラメータを省略)
タスク制御の例
長時間実行されるタスクの一時停止と再開
コピー
"""タスク制御 (一時停止/再開) の例"""
from shannon import ShannonClient
import time
def main():
print("=" * 60)
print("タスク制御 (一時停止/再開) の例")
print("=" * 60)
client = ShannonClient(base_url="http://localhost:8080")
# 長時間実行される研究タスクを送信
print("\n[ステップ 1] 長時間実行されるタスクを送信中...")
handle = client.submit_task(
query="量子コンピューティングとAI統合の最新の進展を調査し、詳細な分析を提供してください。"
)
print(f"✅ タスクが送信されました: {handle.task_id}")
# 少し実行させる
print("\n[ステップ 2] タスクを5秒間実行させる...")
time.sleep(5)
# タスクを一時停止
print("\n[ステップ 3] タスクを一時停止中...")
success = client.pause_task(handle.task_id, reason="中間結果のレビュー")
if success:
print("✅ 一時停止信号が送信されました")
# 制御状態を確認
print("\n[ステップ 4] 制御状態を確認中...")
state = client.get_control_state(handle.task_id)
print(f"一時停止中: {state.is_paused}")
print(f"キャンセル済み: {state.is_cancelled}")
if state.paused_at:
print(f"一時停止時刻: {state.paused_at}")
print(f"一時停止理由: {state.pause_reason}")
print(f"一時停止した人: {state.paused_by}")
# 再開前に少し待つ
print("\n[ステップ 5] 再開前に3秒待機中...")
time.sleep(3)
# タスクを再開
print("\n[ステップ 6] タスクを再開中...")
success = client.resume_task(handle.task_id, reason="レビュー後に続行")
if success:
print("✅ 再開信号が送信されました")
# 完了を待つ
print("\n[ステップ 7] タスクの完了を待機中...")
result = client.wait(handle.task_id)
print(f"\n✅ タスクが完了しました!")
print(f"ステータス: {result.status}")
print(f"\n結果プレビュー: {result.result[:200]}...")
print("\n" + "=" * 60)
print("✅ タスク制御の例が完了しました!")
print("=" * 60)
if __name__ == "__main__":
main()
制御信号のエラーハンドリング
コピー
"""一時停止/再開操作のエラーハンドリング"""
from shannon import ShannonClient
from shannon.errors import APIError
def main():
client = ShannonClient()
task_id = "task-123"
# 一時停止を試みる
try:
client.pause_task(task_id, reason="レビューが必要")
print("✅ タスクが正常に一時停止されました")
except APIError as e:
# 特定のエラーケースを処理
if "完了したタスクを一時停止できません" in str(e):
print("⚠️ タスクはすでに完了しているため、一時停止できません")
elif "タスクはすでに一時停止しています" in str(e):
print("⚠️ タスクはすでに一時停止しています")
elif "タスクが見つかりません" in str(e):
print("❌ タスクが見つかりません")
else:
print(f"❌ エラー: {e}")
# 再開を試みる
try:
client.resume_task(task_id, reason="実行を続ける")
print("✅ タスクが正常に再開されました")
except APIError as e:
if "タスクは一時停止されていません" in str(e):
print("⚠️ タスクは一時停止されていないため、再開できません")
elif "完了したタスクを再開できません" in str(e):
print("⚠️ タスクはすでに完了しているため、再開できません")
else:
print(f"❌ エラー: {e}")
if __name__ == "__main__":
main()