この例のコレクションは増加しています。今後も定期的に使用例が追加されます。
基本的な例
モデル選択
コピー
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
handle = client.submit_task(
"収益トレンドに焦点を当てて段落を3つの箇条書きに要約してください。出力形式: Markdownリスト。",
model_tier="small",
# model_override="gpt-5-nano-2025-08-07",
# provider_override="openai",
mode="simple",
)
print(client.wait(handle.task_id).result)
簡単な質問応答
コピー
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 簡単な質問をする
handle = client.submit_task(query="フランスの首都はどこですか?")
final = client.wait(handle.task_id)
print(f"回答: {final.result}")
データ分析
コピー
from shannon import ShannonClient
client = ShannonClient()
# 現実的なデータ分析
handle = client.submit_task(
query=(
"以下の売上データに基づいて計算してください:\n"
"1. 製品ごとの総収益\n"
"2. 月ごとの成長率\n"
"3. 総収益が最も高い上位3つを返す\n\n"
"データ:\n"
"ProductA: 1月=$10000, 2月=$12000\n"
"ProductB: 1月=$8000, 2月=$9500\n"
"ProductC: 1月=$15000, 2月=$14000\n"
"ProductD: 1月=$5000, 2月=$6000\n\n"
"フォーマット: JSON配列 [{product, revenue, mom_growth_pct}]"
)
)
result = client.wait(handle.task_id)
print(result.result)
高度な例
マルチステップワークフロー
コピー
"""Shannon SDKを使用したマルチステップワークフローの例"""
from shannon import ShannonClient
def main():
client = ShannonClient()
session_id = "quarterly-analysis-demo"
print("=" * 60)
print("マルチステップワークフローの例")
print("=" * 60)
# ステップ1: 初期データクエリ
print("\n[ステップ 1] Q4データを読み込み中...")
h1 = client.submit_task(
query="1000 + 500 はいくつですか?これはQ4の収益(1000)と費用(500)を表します。",
session_id=session_id
)
result1 = client.wait(h1.task_id)
print(f"結果: {result1.result}")
# ステップ2: 前のコンテキストに基づく分析
print("\n[ステップ 2] トレンドを分析中...")
h2 = client.submit_task(
query="あなたが計算したQ4の数字に基づいて、利益率は何パーセントですか?",
session_id=session_id
)
result2 = client.wait(h2.task_id)
print(f"結果: {result2.result}")
# ステップ3: すべての前のコンテキストを使用した要約
print("\n[ステップ 3] エグゼクティブサマリーを作成中...")
h3 = client.submit_task(
query="Q4の財務分析を2文で要約してください。",
session_id=session_id
)
result3 = client.wait(h3.task_id)
print(f"結果: {result3.result}")
print("\n" + "=" * 60)
print("✅ マルチステップワークフローが完了しました!")
print(f"セッションID: {session_id}")
print("=" * 60)
if __name__ == "__main__":
main()
並列処理
コピー
"""Shannon SDKを使用した並列処理の例"""
import asyncio
from shannon import AsyncShannonClient
async def main():
print("=" * 60)
print("並列処理の例")
print("=" * 60)
async with AsyncShannonClient() as client:
# 並列処理するトピック
topics = ["AI", "量子コンピューティング", "バイオテクノロジー"]
print(f"\n[ステップ 1] {len(topics)} タスクを並列で提出中...")
# すべてのタスクを同時に提出(ノンブロッキング)
tasks = [
client.submit_task(
query=f"{topic}の1文要約とその主な応用を教えてください。"
)
for topic in topics
]
# すべての提出が完了するのを待つ
handles = await asyncio.gather(*tasks)
print(f"✅ {len(handles)} タスクがすべて提出されました")
print("\n[ステップ 2] すべての結果を待機中...")
# すべてのタスクが並列で完了するのを待つ
results = await asyncio.gather(
*[client.wait(h.task_id) for h in handles]
)
print(f"✅ {len(results)} タスクがすべて完了しました")
print("\n[ステップ 3] 結果:")
print("-" * 60)
# 結果を表示
for topic, result in zip(topics, results):
print(f"\n📌 {topic}:")
print(f" {result.result}")
# メタデータが利用可能な場合は安全にアクセス
metadata = []
if hasattr(result, 'metadata') and result.metadata:
model = result.metadata.get('model_used') or result.metadata.get('model')
if model:
metadata.append(f"モデル: {model}")
tokens = result.metadata.get('total_tokens')
if tokens:
metadata.append(f"トークン: {tokens}")
if metadata:
print(f" ({', '.join(metadata)})")
print("\n" + "=" * 60)
print("✅ 並列処理が完了しました!")
print(f"{len(topics)} トピックを同時に処理しました")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
フィルターを使用したストリーミング
コピー
"""フィルター付きストリーミングの例"""
from shannon import ShannonClient, EventType
def main():
print("=" * 60)
print("フィルターを使用したストリーミングの例")
print("=" * 60)
client = ShannonClient()
print("\n[ステップ 1] タスクを送信中...")
handle = client.submit_task(
query=(
"AWS、Azure、GCPの価格モデルを3文で比較してください。 "
"生成される出力をストリームします。"
)
)
print(f"✅ タスクが送信されました: {handle.task_id}")
print("\n[ステップ 2] フィルターを使用したストリーミング [LLM_OUTPUT, WORKFLOW_COMPLETED]...")
print("-" * 60)
event_count = 0
llm_outputs = []
# LLM_OUTPUT と WORKFLOW_COMPLETED イベントのみをストリーム
for event in client.stream(
handle.workflow_id,
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED]
):
event_count += 1
print(f"[{event_count}] {event.type}")
if event.type == EventType.LLM_OUTPUT:
print(f" コンテンツ: {event.message[:80]}...")
llm_outputs.append(event.message)
if event.type == EventType.WORKFLOW_COMPLETED:
print(f" ステータス: {event.message}")
break
print("\n" + "-" * 60)
print(f"✅ {event_count} 件のフィルタリングされたイベントを受信")
print(f"✅ {len(llm_outputs)} 件の LLM 出力をキャプチャ")
print("\n[ステップ 3] 完全な結果:")
print("-" * 60)
final = client.wait(handle.task_id)
print(final.result)
print("\n" + "=" * 60)
print("✅ フィルターを使用したストリーミングが完了しました!")
print("=" * 60)
if __name__ == "__main__":
main()
利用可能なイベントタイプ
LLM_OUTPUT— LLM の応答LLM_PARTIAL— ストリーミングトークンWORKFLOW_COMPLETED— タスク完了AGENT_THINKING— エージェントの推論AGENT_STARTED— エージェントが作業を開始AGENT_COMPLETED— エージェントが完了PROGRESS— 進捗更新ERROR_OCCURRED— エラー
- 最終出力のみ:
types=[EventType.LLM_OUTPUT, EventType.WORKFLOW_COMPLETED] - リアルタイムストリーミング:
types=[EventType.LLM_PARTIAL, EventType.WORKFLOW_COMPLETED] - エージェントの監視:
types=[EventType.AGENT_STARTED, EventType.AGENT_COMPLETED] - すべて (フィルターなし):
types=None(またはパラメータを省略)
タスク制御の例
長時間実行されるタスクの一時停止と再開
コピー
"""タスク制御 (一時停止/再開) の例"""
from shannon import ShannonClient
import time
def main():
print("=" * 60)
print("タスク制御 (一時停止/再開) の例")
print("=" * 60)
client = ShannonClient(base_url="http://localhost:8080")
# 長時間実行される研究タスクを送信
print("\n[ステップ 1] 長時間実行されるタスクを送信中...")
handle = client.submit_task(
query="量子コンピューティングとAI統合の最新の進展を調査し、詳細な分析を提供してください。"
)
print(f"✅ タスクが送信されました: {handle.task_id}")
# 少し実行させる
print("\n[ステップ 2] タスクを5秒間実行させる...")
time.sleep(5)
# タスクを一時停止
print("\n[ステップ 3] タスクを一時停止中...")
success = client.pause_task(handle.task_id, reason="中間結果のレビュー")
if success:
print("✅ 一時停止信号が送信されました")
# 制御状態を確認
print("\n[ステップ 4] 制御状態を確認中...")
state = client.get_control_state(handle.task_id)
print(f"一時停止中: {state.is_paused}")
print(f"キャンセル済み: {state.is_cancelled}")
if state.paused_at:
print(f"一時停止時刻: {state.paused_at}")
print(f"一時停止理由: {state.pause_reason}")
print(f"一時停止した人: {state.paused_by}")
# 再開前に少し待つ
print("\n[ステップ 5] 再開前に3秒待機中...")
time.sleep(3)
# タスクを再開
print("\n[ステップ 6] タスクを再開中...")
success = client.resume_task(handle.task_id, reason="レビュー後に続行")
if success:
print("✅ 再開信号が送信されました")
# 完了を待つ
print("\n[ステップ 7] タスクの完了を待機中...")
result = client.wait(handle.task_id)
print(f"\n✅ タスクが完了しました!")
print(f"ステータス: {result.status}")
print(f"\n結果プレビュー: {result.result[:200]}...")
print("\n" + "=" * 60)
print("✅ タスク制御の例が完了しました!")
print("=" * 60)
if __name__ == "__main__":
main()
制御信号のエラーハンドリング
コピー
"""一時停止/再開操作のエラーハンドリング"""
from shannon import ShannonClient
from shannon.errors import ShannonError
def main():
client = ShannonClient()
task_id = "task-123"
# 一時停止を試みる
try:
client.pause_task(task_id, reason="レビューが必要")
print("✅ タスクが正常に一時停止されました")
except ShannonError as e:
# 特定のエラーケースを処理
if "完了したタスクを一時停止できません" in str(e):
print("⚠️ タスクはすでに完了しているため、一時停止できません")
elif "タスクはすでに一時停止しています" in str(e):
print("⚠️ タスクはすでに一時停止しています")
elif "タスクが見つかりません" in str(e):
print("❌ タスクが見つかりません")
else:
print(f"❌ エラー: {e}")
# 再開を試みる
try:
client.resume_task(task_id, reason="実行を続ける")
print("✅ タスクが正常に再開されました")
except ShannonError as e:
if "タスクは一時停止されていません" in str(e):
print("⚠️ タスクは一時停止されていないため、再開できません")
elif "完了したタスクを再開できません" in str(e):
print("⚠️ タスクはすでに完了しているため、再開できません")
else:
print(f"❌ エラー: {e}")
if __name__ == "__main__":
main()
スウォームモード
マルチエージェント実行の強制
コピー
"""スウォームモードの例(Shannon SDK)"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# スウォームモードで送信し、複数のエージェントを連携させる
handle = client.submit_task(
query="MLワークロード向けのAWS、Azure、GCPを調査・比較してください。料金、GPU可用性、マネージドサービスをカバーしてください。",
force_swarm=True,
)
result = client.wait(handle.task_id)
print(result.result)
# モデルと使用量情報を確認
if result.usage:
print(f"合計トークン: {result.usage.get('total_tokens')}")
print(f"コスト: ${result.usage.get('cost_usd', 0):.6f}")
ヒューマン・イン・ザ・ループ レビューの例
インタラクティブなレビューワークフロー
コピー
"""ヒューマン・イン・ザ・ループ レビューの例"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# レビューが必要なワークフローがすでに実行中であると仮定
workflow_id = "workflow-123"
# 現在のレビュー状態を確認
state = client.get_review_state(workflow_id)
print(f"レビューステータス: {state.status}")
print(f"ラウンド: {state.round}, バージョン: {state.version}")
if state.current_plan:
print(f"現在のプラン:\n{state.current_plan}")
# 会話履歴を表示
for r in state.rounds:
print(f" [{r.role}] {r.message}")
# まだレビュー中の場合、フィードバックを提供
if state.status == "reviewing":
updated = client.submit_review_feedback(
workflow_id,
"入力バリデーションとユニットテストを追加してください。",
version=state.version,
)
print(f"フィードバックが送信されました。現在のラウンド: {updated.round}")
# 更新されたプランを承認
result = client.approve_review(workflow_id, version=updated.version)
print(f"承認済み: {result['status']}")
楽観的同時実行制御を使用したレビュー
コピー
"""レビュー中のバージョン競合の処理"""
from shannon import ShannonClient, ValidationError
client = ShannonClient()
workflow_id = "workflow-456"
try:
state = client.get_review_state(workflow_id)
updated = client.submit_review_feedback(
workflow_id,
"全体的に良いですが、些細なスタイルの変更が必要です。",
version=state.version,
)
client.approve_review(workflow_id, version=updated.version)
except ValidationError as e:
if "409" in str(e.code):
print("バージョン競合: 別のユーザーがレビューを変更しました。更新して再試行してください。")
else:
raise
スキルの例
スキルの閲覧と確認
コピー
"""スキル閲覧の例"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 利用可能なすべてのスキルを一覧表示
skills = client.list_skills()
print(f"{len(skills)} 個のスキルが見つかりました:\n")
for s in skills:
flag = " [危険]" if s.dangerous else ""
status = "有効" if s.enabled else "無効"
print(f" {s.name} v{s.version} ({s.category}) - {status}{flag}")
print(f" {s.description}")
# カテゴリでフィルタリング
print("\n--- コーディングスキル ---")
coding = client.list_skills(category="coding")
for s in coding:
print(f" {s.name}: {s.description}")
# 詳細情報を取得
detail = client.get_skill("web_search")
print(f"\nスキル詳細: {detail.name} v{detail.version}")
print(f" 著者: {detail.author}")
print(f" 必要なツール: {detail.requires_tools}")
print(f" 必要なロール: {detail.requires_role}")
print(f" 最大バジェット: {detail.budget_max}")
# バージョン履歴を取得
versions = client.get_skill_versions("web_search")
print(f"\nweb_search のバージョン:")
for v in versions:
print(f" v{v.version} - {v.description}")
スケジュール管理の例
スケジュールタスクの作成と管理
コピー
"""スケジュール管理の例"""
from shannon import ShannonClient
client = ShannonClient(base_url="http://localhost:8080")
# 平日の毎朝9時(UTC)に実行される調査タスクを作成
result = client.create_schedule(
name="デイリーAIニュース",
cron_expression="0 9 * * 1-5",
task_query="最新のAI研究ニュースを要約してください",
task_context={"force_research": "true", "research_strategy": "quick"},
timezone="UTC",
max_budget_per_run_usd=0.50,
)
schedule_id = result["schedule_id"]
print(f"スケジュールが作成されました: {schedule_id}")
# すべてのスケジュールを一覧表示
schedules, total = client.list_schedules()
print(f"\n{total} 個のスケジュール:")
for s in schedules:
print(f" {s.name}: {s.cron_expression} ({s.status})")
print(f" 実行回数: 合計 {s.total_runs}, 成功 {s.successful_runs}, 失敗 {s.failed_runs}")
# スケジュールの詳細を取得
sched = client.get_schedule(schedule_id)
print(f"\nスケジュール: {sched.name}")
print(f" 次回実行: {sched.next_run_at}")
print(f" 前回実行: {sched.last_run_at}")
# 実行履歴を表示
runs, _ = client.get_schedule_runs(schedule_id)
for run in runs:
print(f" {run.triggered_at}: {run.status} - ${run.total_cost_usd:.4f}")
# スケジュールを一時停止(例: メンテナンス中)
client.pause_schedule(schedule_id, reason="メンテナンスウィンドウ")
# スケジュールを再開
client.resume_schedule(schedule_id)
# スケジュール設定を更新
client.update_schedule(
schedule_id,
cron_expression="0 8 * * 1-5", # 8時に変更
max_budget_per_run_usd=0.75,
)
# 不要になったら削除
# client.delete_schedule(schedule_id)