エンドポイント
GET http://localhost:8080/api/v1/tasks/{id}
特定のタスクの現在のステータス、結果、およびメタデータを取得します。このエンドポイントを使用してタスクの進行状況を確認したり、最終結果を取得したりします。
必要: はい
ヘッダーにAPIキーを含めてください:
X-API-Key: sk_test_123456
リクエスト
パスパラメータ
| パラメータ | タイプ | 必要 | 説明 |
|---|
id | string | はい | タスクID(ワークフローIDとしても機能) |
ヘッダー
レスポンス
成功レスポンス
ステータス: 200 OK
ヘッダー:
X-Workflow-ID: Temporalワークフロー識別子(タスクIDと同じ)
ボディ:
{
"task_id": "string",
"status": "string",
"result": "string",
"response": {},
"metadata": {},
"error": "string",
"created_at": "timestamp",
"updated_at": "timestamp",
"query": "string",
"session_id": "string",
"mode": "string",
"model_used": "string",
"provider": "string",
"usage": {
"total_tokens": 0,
"input_tokens": 0,
"output_tokens": 0,
"estimated_cost": 0.0
}
}
レスポンスフィールド
| フィールド | タイプ | 説明 |
|---|
task_id | string | 一意のタスク識別子 |
status | string | 現在のタスクステータス |
result | string | 生のLLM出力(プレーンテキストまたはJSON文字列) |
response | object | 解析されたJSON(resultが有効なJSONを含む場合のみ存在) |
metadata | object | タスクメタデータ(引用、検証、unified_response、抽出された事実、モデルの内訳など) |
error | string | エラーメッセージ(エラーがない場合は空) |
created_at | timestamp | タスク作成時間 |
updated_at | timestamp | 最終更新時間 |
query | string | 元のタスククエリ |
session_id | string | セッション識別子 |
mode | string | 使用された実行モード |
model_used | string | 使用された主要モデル(例: gpt-5-mini-2025-08-07) |
provider | string | プロバイダー名(例: openai, anthropic) |
usage | object | トークン使用量とコスト: { total_tokens, input_tokens?, output_tokens?, estimated_cost? } |
ステータス値
TASK_STATUS_UNSPECIFIED - ステータス不明
TASK_STATUS_QUEUED - 実行待ち
TASK_STATUS_RUNNING - 現在実行中
TASK_STATUS_COMPLETED - 成功裏に完了
TASK_STATUS_FAILED - エラーで失敗
TASK_STATUS_CANCELLED - ユーザーによってキャンセル
実行中のタスクをキャンセルするには、POST /api/v1/tasks/{id}/cancelも参照してください。
TASK_STATUS_TIMEOUT - タイムアウト制限を超えた
実行モード値
EXECUTION_MODE_SIMPLE - 単一のLLM呼び出し、ツールなし
EXECUTION_MODE_STANDARD - ツールを使用したマルチステップ
EXECUTION_MODE_COMPLEX - 高度な推論パターン
タスクステータスの確認
curl -X GET "http://localhost:8080/api/v1/tasks/task_01HQZX3Y9K8M2P4N5S7T9W2V" \
-H "X-API-Key: sk_test_123456"
レスポンス(キュー中):
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"status": "TASK_STATUS_QUEUED",
"response": null,
"error": "",
"created_at": "2025-10-22T10:30:00Z",
"updated_at": "2025-10-22T10:30:00Z",
"query": "フランスの首都はどこですか?",
"session_id": "user-123-session",
"mode": "EXECUTION_MODE_SIMPLE"
}
レスポンス(実行中):
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"status": "TASK_STATUS_RUNNING",
"response": null,
"error": "",
"created_at": "2025-10-22T10:30:00Z",
"updated_at": "2025-10-22T10:30:02Z",
"query": "フランスの首都はどこですか?",
"session_id": "user-123-session",
"mode": "EXECUTION_MODE_SIMPLE"
}
レスポンス(完了):
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"status": "TASK_STATUS_COMPLETED",
"result": "フランスの首都はパリです。パリは987年から首都であり、国の北中央部に位置しています。",
"error": "",
"created_at": "2025-10-22T10:30:00Z",
"updated_at": "2025-10-22T10:30:05Z",
"query": "フランスの首都はどこですか?",
"session_id": "user-123-session",
"mode": "EXECUTION_MODE_SIMPLE",
"model_used": "gpt-5-mini-2025-08-07",
"provider": "openai",
"usage": {
"total_tokens": 300,
"input_tokens": 200,
"output_tokens": 100,
"estimated_cost": 0.006
}
}
レスポンス(失敗):
{
"task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
"status": "TASK_STATUS_FAILED",
"response": null,
"error": "LLMサービスが利用できません: 接続タイムアウト",
"created_at": "2025-10-22T10:30:00Z",
"updated_at": "2025-10-22T10:30:10Z",
"query": "フランスの首都はどこですか?",
"session_id": "user-123-session",
"mode": "EXECUTION_MODE_SIMPLE"
}
ディープリサーチレスポンスペイロード
force_research: trueでタスクが提出されると、完了したレスポンスには構造化されたリサーチデータを含む追加のメタデータフィールドが含まれます。
ディープリサーチメタデータフィールド
ディープリサーチタスクの場合、metadataオブジェクトには以下が含まれます:
| フィールド | タイプ | 説明 |
|---|
unified_response | object | すべてのメタデータを含む統合レスポンス |
citations | array | リサーチからの構造化された引用データ |
verification | object | 主張の検証結果(enable_verification: trueの場合) |
extracted_facts | array | 構造化された事実(enable_fact_extraction: trueの場合) |
fact_summary | object | 抽出された事実の要約統計 |
例: 深い研究の完了レスポンス
{
"task_id": "task-abc123-research",
"status": "TASK_STATUS_COMPLETED",
"result": "# 2025年のAIトレンド\n\n2025年の人工知能の風景は...[1][2]\n\n## 出典\n[1] MIT Technology Review (https://...) - 2025-01-15\n[2] Nature AI (https://...) - 2025-01-10",
"metadata": {
"unified_response": {
"task_id": "task-abc123-research",
"session_id": "session-xyz",
"status": "completed",
"result": "2025年の人工知能の風景は...",
"metadata": {
"model": "claude-sonnet-4-5-20250929",
"execution_mode": "EXECUTION_MODE_COMPLEX",
"complexity_score": 0.72,
"agents_used": 3
},
"usage": {
"input_tokens": 12100,
"output_tokens": 3320,
"total_tokens": 15420,
"cost_usd": 0.0462
},
"performance": {
"execution_time_ms": 45200
},
"stop_reason": "completed",
"error": null,
"timestamp": "2025-01-20T10:00:00Z"
},
"citations": [
{
"title": "AI Breakthrough Report 2025",
"url": "https://www.technologyreview.com/2025/01/ai-report",
"source": "MIT Technology Review",
"credibility_score": 0.92,
"quality_score": 0.88
},
{
"title": "Nature AI Special Issue",
"url": "https://www.nature.com/articles/ai-2025",
"source": "Nature",
"credibility_score": 0.95,
"quality_score": 0.91
}
],
"verification": {
"overall_confidence": 0.87,
"total_claims": 12,
"supported_claims": 10,
"unsupported_claims": ["未来予測に関する主張"],
"conflicts": [],
"claim_details": [
{
"claim": "AIモデルはベンチマークXで95%の精度を達成した",
"supporting_citations": [1, 2],
"conflicting_citations": [],
"confidence": 0.92
}
]
}
},
"model_used": "claude-sonnet-4-5-20250929",
"provider": "anthropic",
"usage": {
"total_tokens": 15420,
"input_tokens": 12100,
"output_tokens": 3320,
"estimated_cost": 0.0462
}
}
抽出された事実 (オプション)
enable_fact_extraction: true がリクエストコンテキストに設定されている場合:
{
"metadata": {
"extracted_facts": [
{
"statement": "GPT-5はMMLUベンチマークで95%の精度を達成した",
"confidence": 0.92,
"source_citation": [1, 3],
"category": "performance",
"entity_mentions": ["GPT-5", "MMLU"],
"temporal_marker": "2025",
"is_quantitative": true
}
],
"fact_summary": {
"total_facts": 24,
"high_confidence": 18,
"categorized_facts": {
"performance": 8,
"market": 6,
"research": 10
},
"contradiction_count": 1
}
}
}
引用オブジェクトスキーマ
| フィールド | 型 | 説明 |
|---|
url | string | 出典のURL |
title | string | 記事/ページのタイトル |
source | string | 出版社/ドメイン名 |
credibility_score | float | 出典の信頼性 (0.0-1.0) |
quality_score | float | コンテンツの質 (0.0-1.0) |
metadata.citations 配列内の各引用の位置は、インライン参照で使用される [n] インデックスに対応しています。
検証オブジェクトスキーマ
| フィールド | 型 | 説明 |
|---|
overall_confidence | float | 集計された検証信頼度 (0.0-1.0) |
total_claims | integer | 抽出された事実の数 |
supported_claims | integer | 支持する引用がある主張の数 |
unsupported_claims | array | 引用支持がない主張のテキストリスト |
conflicts | array | 検出された矛盾情報 |
claim_details | array | 各主張の検証詳細 |
深い研究データへのアクセス: metadata.citations 配列と metadata.verification オブジェクトは、研究ワークフロー (force_research: true) のみで populated されます。シンプルなタスクの場合、これらのフィールドはレスポンスに存在しません。
エラーレスポンス
401 Unauthorized
{
"error": "Unauthorized"
}
404 Not Found
{
"error": "Task not found"
}
500 Internal Server Error
{
"error": "Failed to get task status: database error"
}
コード例
Python - シンプルなステータスチェック
import httpx
def get_task_status(task_id: str, api_key: str):
"""タスクのステータスを取得します。"""
response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key}
)
if response.status_code == 404:
return None
return response.json()
status = get_task_status("task_abc123", "sk_test_123456")
if status:
print(f"ステータス: {status['status']}")
if status['status'] == 'TASK_STATUS_COMPLETED':
print(f"結果: {status['result']}")
Python - 完了までポーリング
import httpx
import time
def wait_for_completion(task_id: str, api_key: str, timeout: int = 300):
"""完了またはタイムアウトまでタスクのステータスをポーリングします。"""
start_time = time.time()
while True:
response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key}
)
status = response.json()
current_status = status["status"]
# 終端状態をチェック
if current_status == "TASK_STATUS_COMPLETED":
return status.get("result")
elif current_status == "TASK_STATUS_FAILED":
raise Exception(f"タスクが失敗しました: {status['error']}")
elif current_status == "TASK_STATUS_TIMEOUT":
raise Exception("タスクがタイムアウトしました")
elif current_status == "TASK_STATUS_CANCELLED":
raise Exception("タスクがキャンセルされました")
# タイムアウトをチェック
if time.time() - start_time > timeout:
raise TimeoutError(f"{timeout}s後にポーリングタイムアウト")
# 次のポーリングまで待機
time.sleep(2)
# 使用例
try:
result = wait_for_completion("task_abc123", "sk_test_123456")
print("結果:", result)
except Exception as e:
print("エラー:", e)
JavaScript/Node.js
const axios = require('axios');
async function getTaskStatus(taskId) {
try {
const response = await axios.get(
`http://localhost:8080/api/v1/tasks/${taskId}`,
{
headers: {
'X-API-Key': 'sk_test_123456'
}
}
);
const status = response.data;
console.log('Status:', status.status);
if (status.status === 'TASK_STATUS_COMPLETED') {
console.log('Result:', status.result);
} else if (status.status === 'TASK_STATUS_FAILED') {
console.error('Error:', status.error);
}
return status;
} catch (error) {
if (error.response?.status === 404) {
console.error('タスクが見つかりません');
} else {
console.error('エラー:', error.response?.data || error.message);
}
throw error;
}
}
getTaskStatus('task_abc123');
JavaScript - Poll with Async/Await
async function waitForCompletion(taskId, timeout = 300000) {
const startTime = Date.now();
const pollInterval = 2000; // 2秒
while (true) {
const response = await axios.get(
`http://localhost:8080/api/v1/tasks/${taskId}`,
{ headers: { 'X-API-Key': 'sk_test_123456' } }
);
const { status, result, error } = response.data;
if (status === 'TASK_STATUS_COMPLETED') {
return result;
} else if (status === 'TASK_STATUS_FAILED') {
throw new Error(`タスクが失敗しました: ${error}`);
} else if (status === 'TASK_STATUS_TIMEOUT') {
throw new Error('タスクがタイムアウトしました');
}
// タイムアウトを確認
if (Date.now() - startTime > timeout) {
throw new Error(`ポーリングタイムアウト: ${timeout}ms`);
}
// 次のポーリングまで待機
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
}
// 使用例
waitForCompletion('task_abc123')
.then(result => console.log('Result:', result))
.catch(error => console.error('エラー:', error.message));
package main
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
type TaskStatusResponse struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Response map[string]interface{} `json:"response"`
Error string `json:"error"`
Query string `json:"query"`
SessionID string `json:"session_id"`
Mode string `json:"mode"`
}
func getTaskStatus(taskID string) (*TaskStatusResponse, error) {
url := fmt.Sprintf("http://localhost:8080/api/v1/tasks/%s", taskID)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("X-API-Key", "sk_test_123456")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, fmt.Errorf("タスクが見つかりません")
}
var status TaskStatusResponse
json.NewDecoder(resp.Body).Decode(&status)
return &status, nil
}
func waitForCompletion(taskID string, timeout time.Duration) (map[string]interface{}, error) {
start := time.Now()
for {
status, err := getTaskStatus(taskID)
if err != nil {
return nil, err
}
switch status.Status {
case "TASK_STATUS_COMPLETED":
return status.Response, nil
case "TASK_STATUS_FAILED":
return nil, fmt.Errorf("タスクが失敗しました: %s", status.Error)
case "TASK_STATUS_TIMEOUT":
return nil, fmt.Errorf("タスクがタイムアウトしました")
}
if time.Since(start) > timeout {
return nil, fmt.Errorf("ポーリングタイムアウト")
}
time.Sleep(2 * time.Second)
}
}
func main() {
result, err := waitForCompletion("task_abc123", 5*time.Minute)
if err != nil {
fmt.Println("エラー:", err)
return
}
fmt.Println("Result:", result)
}
Bash - Monitor Task Progress
#!/bin/bash
API_KEY="sk_test_123456"
TASK_ID="$1"
if [ -z "$TASK_ID" ]; then
echo "使用法: $0 <task_id>"
exit 1
fi
echo "タスクを監視中: $TASK_ID"
echo ""
while true; do
RESPONSE=$(curl -s "http://localhost:8080/api/v1/tasks/$TASK_ID" \
-H "X-API-Key: $API_KEY")
STATUS=$(echo $RESPONSE | jq -r '.status')
echo "[$(date +%T)] ステータス: $STATUS"
case $STATUS in
"TASK_STATUS_COMPLETED")
echo ""
echo "✓ タスクが完了しました!"
echo ""
echo $RESPONSE | jq -r '.response.result'
exit 0
;;
"TASK_STATUS_FAILED")
echo ""
echo "✗ タスクが失敗しました!"
ERROR=$(echo $RESPONSE | jq -r '.error')
echo "エラー: $ERROR"
exit 1
;;
"TASK_STATUS_TIMEOUT")
echo ""
echo "✗ タスクがタイムアウトしました!"
exit 1
;;
"TASK_STATUS_CANCELLED")
echo ""
echo "✗ タスクがキャンセルされました!"
exit 1
;;
esac
sleep 2
done
Use Cases
1. Submit and Wait Pattern
import httpx
import time
def submit_and_wait(query: str, api_key: str):
"""タスクを送信し、結果を待つ。"""
# 送信
submit_response = httpx.post(
"http://localhost:8080/api/v1/tasks",
headers={"X-API-Key": api_key},
json={"query": query}
)
task_id = submit_response.json()["task_id"]
print(f"タスクが送信されました: {task_id}")
# 待機
while True:
status_response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key}
)
status = status_response.json()
if status["status"] == "TASK_STATUS_COMPLETED":
return status["response"]["result"]
elif status["status"] == "TASK_STATUS_FAILED":
raise Exception(status["error"])
time.sleep(2)
result = submit_and_wait("Pythonとは何ですか?", "sk_test_123456")
print(result)
2. ダッシュボードステータスウィジェット
def get_task_summary(task_id: str, api_key: str):
"""ダッシュボードのタスク概要を取得します。"""
response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key}
)
status = response.json()
return {
"id": task_id,
"query": status["query"][:50] + "...",
"status": status["status"].replace("TASK_STATUS_", ""),
"mode": status["mode"].replace("EXECUTION_MODE_", ""),
"created": status["created_at"]
}
# UIに表示
summary = get_task_summary("task_abc123", "sk_test_123456")
print(f"{summary['status']}: {summary['query']}")
3. バッチステータスチェック
def check_multiple_tasks(task_ids: list, api_key: str):
"""複数のタスクのステータスをチェックします。"""
results = {}
for task_id in task_ids:
try:
response = httpx.get(
f"http://localhost:8080/api/v1/tasks/{task_id}",
headers={"X-API-Key": api_key},
timeout=5.0
)
results[task_id] = response.json()["status"]
except Exception as e:
results[task_id] = f"ERROR: {e}"
return results
# 5つのタスクをチェック
task_ids = ["task_1", "task_2", "task_3", "task_4", "task_5"]
statuses = check_multiple_tasks(task_ids, "sk_test_123456")
for task_id, status in statuses.items():
print(f"{task_id}: {status}")
ベストプラクティス
1. ポーリングの代わりにストリーミングを使用
長時間実行されるタスクには、ポーリングの代わりにSSEストリーミングを使用します:
# ❌ 悪い - 2秒ごとにポーリング
while True:
status = httpx.get(f".../{task_id}")
if status["status"] == "COMPLETED":
break
time.sleep(2)
# ✅ 良い - ストリーミングを使用
for event in client.stream(task_id):
print(event.type, event.message)
if event.type == "TASK_COMPLETED":
break
2. すべてのステータス状態を処理
status = get_task_status(task_id, api_key)
match status["status"]:
case "TASK_STATUS_QUEUED":
print("タスクはキューに入っています...")
case "TASK_STATUS_RUNNING":
print("タスクは実行中です...")
case "TASK_STATUS_COMPLETED":
result = status["response"]["result"]
print(f"結果: {result}")
case "TASK_STATUS_FAILED":
print(f"失敗: {status['error']}")
case "TASK_STATUS_TIMEOUT":
print("タスクがタイムアウトしました")
case "TASK_STATUS_CANCELLED":
print("タスクはキャンセルされました")
3. 指数バックオフを実装
import time
def poll_with_backoff(task_id, api_key, max_wait=60):
"""指数バックオフでポーリングします。"""
wait_time = 1
while True:
status = get_task_status(task_id, api_key)
if status["status"] in ["TASK_STATUS_COMPLETED", "TASK_STATUS_FAILED"]:
return status
time.sleep(wait_time)
wait_time = min(wait_time * 2, max_wait) # 最大60秒に制限
4. ステータスレスポンスをキャッシュ
from functools import lru_cache
import time
@lru_cache(maxsize=1000)
def get_cached_status(task_id: str, api_key: str, timestamp: int):
"""5秒間ステータスをキャッシュします。"""
return get_task_status(task_id, api_key)
# 使用例
current_time = int(time.time() / 5) # 5秒ごとのバケット
status = get_cached_status("task_abc123", "sk_test_123456", current_time)
5. メタデータを抽出
def extract_task_info(task_id: str, api_key: str):
"""有用なメタデータを抽出します。"""
status = get_task_status(task_id, api_key)
return {
"task_id": status["task_id"],
"query": status["query"],
"status": status["status"],
"mode": status["mode"],
"session_id": status["session_id"],
"has_result": status["response"] is not None,
"has_error": bool(status["error"]),
"workflow_url": f"http://localhost:8088/workflows/{task_id}"
}
関連エンドポイント
注意事項
本番環境でのポーリングは避けてください: 長時間実行されるタスクには、ポーリングの代わりにストリーミングエンドポイントを使用してください。ポーリングは不必要な負荷を生み出し、レイテンシを追加します。
セッショントラッキング: session_idフィールドを使用すると、タスクがどのセッションに属しているかを追跡できます。これは、マルチターンの会話やコストの帰属に役立ちます。