エンドポイント
GET http://localhost:8080/api/v1/tasks/{id}
特定のタスクの現在のステータス、結果、およびメタデータを取得します。このエンドポイントを使用してタスクの進行状況を確認したり、最終結果を取得したりします。
必要 : はい
ヘッダーにAPIキーを含めてください:
X-API-Key: sk_test_123456
リクエスト
パスパラメータ
パラメータ タイプ 必要 説明 idstring はい タスクID(ワークフローIDとしても機能)
ヘッダー
レスポンス
成功レスポンス
ステータス : 200 OK
ヘッダー :
X-Workflow-ID: Temporalワークフロー識別子(タスクIDと同じ)
ボディ :
{
"task_id" : "string" ,
"workflow_id" : "string" ,
"status" : "string" ,
"result" : "string" ,
"response" : {},
"metadata" : {},
"error" : "string" ,
"created_at" : "timestamp" ,
"updated_at" : "timestamp" ,
"query" : "string" ,
"session_id" : "string" ,
"context" : {},
"unified_response" : {},
"mode" : "string" ,
"model_used" : "string" ,
"provider" : "string" ,
"usage" : {
"total_tokens" : 0 ,
"input_tokens" : 0 ,
"output_tokens" : 0 ,
"estimated_cost" : 0.0
}
}
レスポンスフィールド
フィールド タイプ 説明 task_idstring 一意のタスク識別子 workflow_idstring ワークフロー識別子(task_idと同一) statusstring 現在のタスクステータス resultstring 生のLLM出力(プレーンテキストまたはJSON文字列) responseobject 解析されたJSON(resultが有効なJSONを含む場合のみ存在) metadataobject タスクメタデータ(引用、検証、unified_response、抽出された事実、モデルの内訳など) errorstring エラーメッセージ(エラーがない場合は空) created_attimestamp タスク作成時間 updated_attimestamp 最終更新時間 querystring 元のタスククエリ session_idstring セッション識別子 contextobject タスクコンテキスト(予算情報、日付、パラメータ) unified_responseobject 構造化された統一レスポンス(パース済み結果、パフォーマンスメトリクス、メタデータを含む) modestring 使用された実行モード model_usedstring 使用された主要モデル(例: gpt-5-mini-2025-08-07) providerstring プロバイダー名(例: openai, anthropic) usageobject トークン使用量とコスト: { total_tokens, input_tokens?, output_tokens?, estimated_cost? }
ステータス値
TASK_STATUS_UNSPECIFIED - ステータス不明
TASK_STATUS_QUEUED - 実行待ち
TASK_STATUS_RUNNING - 現在実行中
TASK_STATUS_COMPLETED - 成功裏に完了
TASK_STATUS_FAILED - エラーで失敗
TASK_STATUS_PAUSED - ユーザーまたはHITLレビューにより一時停止
TASK_STATUS_CANCELLED - ユーザーによってキャンセル
TASK_STATUS_TIMEOUT - タイムアウト制限を超えた
実行モード値
EXECUTION_MODE_SIMPLE - 単一のLLM呼び出し、ツールなし
EXECUTION_MODE_STANDARD - ツールを使用したマルチステップ
EXECUTION_MODE_COMPLEX - 高度な推論パターン
タスクステータスの確認
curl -X GET "http://localhost:8080/api/v1/tasks/task_01HQZX3Y9K8M2P4N5S7T9W2V" \
-H "X-API-Key: sk_test_123456"
レスポンス(キュー中) :
{
"task_id" : "task_01HQZX3Y9K8M2P4N5S7T9W2V" ,
"status" : "TASK_STATUS_QUEUED" ,
"response" : null ,
"error" : "" ,
"created_at" : "2025-10-22T10:30:00Z" ,
"updated_at" : "2025-10-22T10:30:00Z" ,
"query" : "フランスの首都はどこですか?" ,
"session_id" : "user-123-session" ,
"mode" : "EXECUTION_MODE_SIMPLE"
}
レスポンス(実行中) :
{
"task_id" : "task_01HQZX3Y9K8M2P4N5S7T9W2V" ,
"status" : "TASK_STATUS_RUNNING" ,
"response" : null ,
"error" : "" ,
"created_at" : "2025-10-22T10:30:00Z" ,
"updated_at" : "2025-10-22T10:30:02Z" ,
"query" : "フランスの首都はどこですか?" ,
"session_id" : "user-123-session" ,
"mode" : "EXECUTION_MODE_SIMPLE"
}
レスポンス(完了) :
{
"task_id" : "task_01HQZX3Y9K8M2P4N5S7T9W2V" ,
"status" : "TASK_STATUS_COMPLETED" ,
"result" : "フランスの首都はパリです。パリは987年から首都であり、国の北中央部に位置しています。" ,
"error" : "" ,
"created_at" : "2025-10-22T10:30:00Z" ,
"updated_at" : "2025-10-22T10:30:05Z" ,
"query" : "フランスの首都はどこですか?" ,
"session_id" : "user-123-session" ,
"mode" : "EXECUTION_MODE_SIMPLE" ,
"model_used" : "gpt-5-mini-2025-08-07" ,
"provider" : "openai" ,
"usage" : {
"total_tokens" : 300 ,
"input_tokens" : 200 ,
"output_tokens" : 100 ,
"estimated_cost" : 0.006
}
}
レスポンス(失敗) :
{
"task_id" : "task_01HQZX3Y9K8M2P4N5S7T9W2V" ,
"status" : "TASK_STATUS_FAILED" ,
"response" : null ,
"error" : "LLMサービスが利用できません: 接続タイムアウト" ,
"created_at" : "2025-10-22T10:30:00Z" ,
"updated_at" : "2025-10-22T10:30:10Z" ,
"query" : "フランスの首都はどこですか?" ,
"session_id" : "user-123-session" ,
"mode" : "EXECUTION_MODE_SIMPLE"
}
ディープリサーチレスポンスペイロード
force_research: trueでタスクが提出されると、完了したレスポンスには構造化されたリサーチデータを含む追加のメタデータフィールドが含まれます。
ディープリサーチメタデータフィールド
ディープリサーチタスクの場合、metadataオブジェクトには以下が含まれます:
フィールド タイプ 説明 unified_responseobject すべてのメタデータを含む統合レスポンス citationsarray リサーチからの構造化された引用データ verificationobject 主張の検証結果(enable_verification: trueの場合) extracted_factsarray 構造化された事実(enable_fact_extraction: trueの場合) fact_summaryobject 抽出された事実の要約統計
例: 深い研究の完了レスポンス
{
"task_id" : "task-abc123-research" ,
"status" : "TASK_STATUS_COMPLETED" ,
"result" : "# 2025年のAIトレンド \n\n 2025年の人工知能の風景は...[1][2] \n\n ## 出典 \n [1] MIT Technology Review (https://...) - 2025-01-15 \n [2] Nature AI (https://...) - 2025-01-10" ,
"metadata" : {
"unified_response" : {
"task_id" : "task-abc123-research" ,
"session_id" : "session-xyz" ,
"status" : "completed" ,
"result" : "2025年の人工知能の風景は..." ,
"metadata" : {
"model" : "claude-sonnet-4-5-20250929" ,
"execution_mode" : "EXECUTION_MODE_COMPLEX" ,
"complexity_score" : 0.72 ,
"agents_used" : 3
},
"usage" : {
"input_tokens" : 12100 ,
"output_tokens" : 3320 ,
"total_tokens" : 15420 ,
"cost_usd" : 0.0462
},
"performance" : {
"execution_time_ms" : 45200
},
"stop_reason" : "completed" ,
"error" : null ,
"timestamp" : "2025-01-20T10:00:00Z"
},
"citations" : [
{
"title" : "AI Breakthrough Report 2025" ,
"url" : "https://www.technologyreview.com/2025/01/ai-report" ,
"source" : "MIT Technology Review" ,
"credibility_score" : 0.92 ,
"quality_score" : 0.88
},
{
"title" : "Nature AI Special Issue" ,
"url" : "https://www.nature.com/articles/ai-2025" ,
"source" : "Nature" ,
"credibility_score" : 0.95 ,
"quality_score" : 0.91
}
],
"verification" : {
"overall_confidence" : 0.87 ,
"total_claims" : 12 ,
"supported_claims" : 10 ,
"unsupported_claims" : [ "未来予測に関する主張" ],
"conflicts" : [],
"claim_details" : [
{
"claim" : "AIモデルはベンチマークXで95%の精度を達成した" ,
"supporting_citations" : [ 1 , 2 ],
"conflicting_citations" : [],
"confidence" : 0.92
}
]
}
},
"model_used" : "claude-sonnet-4-5-20250929" ,
"provider" : "anthropic" ,
"usage" : {
"total_tokens" : 15420 ,
"input_tokens" : 12100 ,
"output_tokens" : 3320 ,
"estimated_cost" : 0.0462
}
}
抽出された事実 (オプション)
enable_fact_extraction: true がリクエストコンテキストに設定されている場合:
{
"metadata" : {
"extracted_facts" : [
{
"statement" : "GPT-5はMMLUベンチマークで95%の精度を達成した" ,
"confidence" : 0.92 ,
"source_citation" : [ 1 , 3 ],
"category" : "performance" ,
"entity_mentions" : [ "GPT-5" , "MMLU" ],
"temporal_marker" : "2025" ,
"is_quantitative" : true
}
],
"fact_summary" : {
"total_facts" : 24 ,
"high_confidence" : 18 ,
"categorized_facts" : {
"performance" : 8 ,
"market" : 6 ,
"research" : 10
},
"contradiction_count" : 1
}
}
}
引用オブジェクトスキーマ
フィールド 型 説明 urlstring 出典のURL titlestring 記事/ページのタイトル sourcestring 出版社/ドメイン名 credibility_scorefloat 出典の信頼性 (0.0-1.0) quality_scorefloat コンテンツの質 (0.0-1.0)
metadata.citations 配列内の各引用の位置は、インライン参照で使用される [n] インデックスに対応しています。
検証オブジェクトスキーマ
フィールド 型 説明 overall_confidencefloat 集計された検証信頼度 (0.0-1.0) total_claimsinteger 抽出された事実の数 supported_claimsinteger 支持する引用がある主張の数 unsupported_claimsarray 引用支持がない主張のテキストリスト conflictsarray 検出された矛盾情報 claim_detailsarray 各主張の検証詳細
深い研究データへのアクセス : metadata.citations 配列と metadata.verification オブジェクトは、研究ワークフロー (force_research: true) のみで populated されます。シンプルなタスクの場合、これらのフィールドはレスポンスに存在しません。
エラーレスポンス
401 Unauthorized
{
"error" : "Unauthorized"
}
404 Not Found
{
"error" : "Task not found"
}
500 Internal Server Error
{
"error" : "Failed to get task status: database error"
}
コード例
Python - シンプルなステータスチェック
import httpx
def get_task_status ( task_id : str , api_key : str ):
"""タスクのステータスを取得します。"""
response = httpx.get(
f "http://localhost:8080/api/v1/tasks/ { task_id } " ,
headers ={ "X-API-Key" : api_key}
)
if response.status_code == 404 :
return None
return response.json()
status = get_task_status( "task_abc123" , "sk_test_123456" )
if status:
print ( f "ステータス: { status[ 'status' ] } " )
if status[ 'status' ] == 'TASK_STATUS_COMPLETED' :
print ( f "結果: { status[ 'result' ] } " )
Python - 完了までポーリング
import httpx
import time
def wait_for_completion ( task_id : str , api_key : str , timeout : int = 300 ):
"""完了またはタイムアウトまでタスクのステータスをポーリングします。"""
start_time = time.time()
while True :
response = httpx.get(
f "http://localhost:8080/api/v1/tasks/ { task_id } " ,
headers ={ "X-API-Key" : api_key}
)
status = response.json()
current_status = status[ "status" ]
# 終端状態をチェック
if current_status == "TASK_STATUS_COMPLETED" :
return status.get( "result" )
elif current_status == "TASK_STATUS_FAILED" :
raise Exception ( f "タスクが失敗しました: { status[ 'error' ] } " )
elif current_status == "TASK_STATUS_TIMEOUT" :
raise Exception ( "タスクがタイムアウトしました" )
elif current_status == "TASK_STATUS_CANCELLED" :
raise Exception ( "タスクがキャンセルされました" )
# タイムアウトをチェック
if time.time() - start_time > timeout:
raise TimeoutError ( f " { timeout } s後にポーリングタイムアウト" )
# 次のポーリングまで待機
time.sleep( 2 )
# 使用例
try :
result = wait_for_completion( "task_abc123" , "sk_test_123456" )
print ( "結果:" , result)
except Exception as e:
print ( "エラー:" , e)
JavaScript/Node.js
const axios = require ( 'axios' );
async function getTaskStatus ( taskId ) {
try {
const response = await axios . get (
`http://localhost:8080/api/v1/tasks/ ${ taskId } ` ,
{
headers: {
'X-API-Key' : 'sk_test_123456'
}
}
);
const status = response . data ;
console . log ( 'Status:' , status . status );
if ( status . status === 'TASK_STATUS_COMPLETED' ) {
console . log ( 'Result:' , status . result );
} else if ( status . status === 'TASK_STATUS_FAILED' ) {
console . error ( 'Error:' , status . error );
}
return status ;
} catch ( error ) {
if ( error . response ?. status === 404 ) {
console . error ( 'タスクが見つかりません' );
} else {
console . error ( 'エラー:' , error . response ?. data || error . message );
}
throw error ;
}
}
getTaskStatus ( 'task_abc123' );
JavaScript - Poll with Async/Await
async function waitForCompletion ( taskId , timeout = 300000 ) {
const startTime = Date . now ();
const pollInterval = 2000 ; // 2秒
while ( true ) {
const response = await axios . get (
`http://localhost:8080/api/v1/tasks/ ${ taskId } ` ,
{ headers: { 'X-API-Key' : 'sk_test_123456' } }
);
const { status , result , error } = response . data ;
if ( status === 'TASK_STATUS_COMPLETED' ) {
return result ;
} else if ( status === 'TASK_STATUS_FAILED' ) {
throw new Error ( `タスクが失敗しました: ${ error } ` );
} else if ( status === 'TASK_STATUS_TIMEOUT' ) {
throw new Error ( 'タスクがタイムアウトしました' );
}
// タイムアウトを確認
if ( Date . now () - startTime > timeout ) {
throw new Error ( `ポーリングタイムアウト: ${ timeout } ms` );
}
// 次のポーリングまで待機
await new Promise ( resolve => setTimeout ( resolve , pollInterval ));
}
}
// 使用例
waitForCompletion ( 'task_abc123' )
. then ( result => console . log ( 'Result:' , result ))
. catch ( error => console . error ( 'エラー:' , error . message ));
package main
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
type TaskStatusResponse struct {
TaskID string `json:"task_id"`
WorkflowID string `json:"workflow_id"`
Status string `json:"status"`
Result string `json:"result"`
Response map [ string ] interface {} `json:"response"`
Error string `json:"error"`
Query string `json:"query"`
SessionID string `json:"session_id"`
Mode string `json:"mode"`
Context map [ string ] interface {} `json:"context"`
ModelUsed string `json:"model_used"`
Provider string `json:"provider"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
Usage map [ string ] interface {} `json:"usage"`
Metadata map [ string ] interface {} `json:"metadata"`
UnifiedResponse map [ string ] interface {} `json:"unified_response"`
}
func getTaskStatus ( taskID string ) (* TaskStatusResponse , error ) {
url := fmt . Sprintf ( "http://localhost:8080/api/v1/tasks/ %s " , taskID )
req , _ := http . NewRequest ( "GET" , url , nil )
req . Header . Set ( "X-API-Key" , "sk_test_123456" )
client := & http . Client {}
resp , err := client . Do ( req )
if err != nil {
return nil , err
}
defer resp . Body . Close ()
if resp . StatusCode == 404 {
return nil , fmt . Errorf ( "タスクが見つかりません" )
}
var status TaskStatusResponse
json . NewDecoder ( resp . Body ). Decode (& status )
return & status , nil
}
func waitForCompletion ( taskID string , timeout time . Duration ) ( map [ string ] interface {}, error ) {
start := time . Now ()
for {
status , err := getTaskStatus ( taskID )
if err != nil {
return nil , err
}
switch status . Status {
case "TASK_STATUS_COMPLETED" :
return status . Response , nil
case "TASK_STATUS_FAILED" :
return nil , fmt . Errorf ( "タスクが失敗しました: %s " , status . Error )
case "TASK_STATUS_TIMEOUT" :
return nil , fmt . Errorf ( "タスクがタイムアウトしました" )
}
if time . Since ( start ) > timeout {
return nil , fmt . Errorf ( "ポーリングタイムアウト" )
}
time . Sleep ( 2 * time . Second )
}
}
func main () {
result , err := waitForCompletion ( "task_abc123" , 5 * time . Minute )
if err != nil {
fmt . Println ( "エラー:" , err )
return
}
fmt . Println ( "Result:" , result )
}
Bash - Monitor Task Progress
#!/bin/bash
API_KEY = "sk_test_123456"
TASK_ID = " $1 "
if [ -z " $TASK_ID " ]; then
echo "使用法: $0 <task_id>"
exit 1
fi
echo "タスクを監視中: $TASK_ID "
echo ""
while true ; do
RESPONSE =$( curl -s "http://localhost:8080/api/v1/tasks/ $TASK_ID " \
-H "X-API-Key: $API_KEY " )
STATUS =$( echo $RESPONSE | jq -r '.status' )
echo "[$( date +%T)] ステータス: $STATUS "
case $STATUS in
"TASK_STATUS_COMPLETED" )
echo ""
echo "✓ タスクが完了しました!"
echo ""
echo $RESPONSE | jq -r '.result'
exit 0
;;
"TASK_STATUS_FAILED" )
echo ""
echo "✗ タスクが失敗しました!"
ERROR =$( echo $RESPONSE | jq -r '.error' )
echo "エラー: $ERROR "
exit 1
;;
"TASK_STATUS_TIMEOUT" )
echo ""
echo "✗ タスクがタイムアウトしました!"
exit 1
;;
"TASK_STATUS_CANCELLED" )
echo ""
echo "✗ タスクがキャンセルされました!"
exit 1
;;
esac
sleep 2
done
Use Cases
1. Submit and Wait Pattern
import httpx
import time
def submit_and_wait ( query : str , api_key : str ):
"""タスクを送信し、結果を待つ。"""
# 送信
submit_response = httpx.post(
"http://localhost:8080/api/v1/tasks" ,
headers ={ "X-API-Key" : api_key},
json ={ "query" : query}
)
task_id = submit_response.json()[ "task_id" ]
print ( f "タスクが送信されました: { task_id } " )
# 待機
while True :
status_response = httpx.get(
f "http://localhost:8080/api/v1/tasks/ { task_id } " ,
headers ={ "X-API-Key" : api_key}
)
status = status_response.json()
if status[ "status" ] == "TASK_STATUS_COMPLETED" :
return status[ "result" ]
elif status[ "status" ] == "TASK_STATUS_FAILED" :
raise Exception (status[ "error" ])
time.sleep( 2 )
result = submit_and_wait( "Pythonとは何ですか?" , "sk_test_123456" )
print (result)
2. ダッシュボードステータスウィジェット
def get_task_summary ( task_id : str , api_key : str ):
"""ダッシュボードのタスク概要を取得します。"""
response = httpx.get(
f "http://localhost:8080/api/v1/tasks/ { task_id } " ,
headers ={ "X-API-Key" : api_key}
)
status = response.json()
return {
"id" : task_id,
"query" : status[ "query" ][: 50 ] + "..." ,
"status" : status[ "status" ].replace( "TASK_STATUS_" , "" ),
"mode" : status[ "mode" ].replace( "EXECUTION_MODE_" , "" ),
"created" : status[ "created_at" ]
}
# UIに表示
summary = get_task_summary( "task_abc123" , "sk_test_123456" )
print ( f " { summary[ 'status' ] } : { summary[ 'query' ] } " )
3. バッチステータスチェック
def check_multiple_tasks ( task_ids : list , api_key : str ):
"""複数のタスクのステータスをチェックします。"""
results = {}
for task_id in task_ids:
try :
response = httpx.get(
f "http://localhost:8080/api/v1/tasks/ { task_id } " ,
headers ={ "X-API-Key" : api_key},
timeout = 5.0
)
results[task_id] = response.json()[ "status" ]
except Exception as e:
results[task_id] = f "ERROR: { e } "
return results
# 5つのタスクをチェック
task_ids = [ "task_1" , "task_2" , "task_3" , "task_4" , "task_5" ]
statuses = check_multiple_tasks(task_ids, "sk_test_123456" )
for task_id, status in statuses.items():
print ( f " { task_id } : { status } " )
ベストプラクティス
1. ポーリングの代わりにストリーミングを使用
長時間実行されるタスクには、ポーリングの代わりにSSEストリーミングを使用します:
# ❌ 悪い - 2秒ごとにポーリング
while True :
status = httpx.get( f ".../ { task_id } " )
if status[ "status" ] == "COMPLETED" :
break
time.sleep( 2 )
# ✅ 良い - ストリーミングを使用
for event in client.stream(task_id):
print (event.type, event.message)
if event.type == "TASK_COMPLETED" :
break
2. すべてのステータス状態を処理
status = get_task_status(task_id, api_key)
match status[ "status" ]:
case "TASK_STATUS_QUEUED" :
print ( "タスクはキューに入っています..." )
case "TASK_STATUS_RUNNING" :
print ( "タスクは実行中です..." )
case "TASK_STATUS_COMPLETED" :
result = status[ "result" ]
print ( f "結果: { result } " )
case "TASK_STATUS_FAILED" :
print ( f "失敗: { status[ 'error' ] } " )
case "TASK_STATUS_TIMEOUT" :
print ( "タスクがタイムアウトしました" )
case "TASK_STATUS_CANCELLED" :
print ( "タスクはキャンセルされました" )
3. 指数バックオフを実装
import time
def poll_with_backoff ( task_id , api_key , max_wait = 60 ):
"""指数バックオフでポーリングします。"""
wait_time = 1
while True :
status = get_task_status(task_id, api_key)
if status[ "status" ] in [ "TASK_STATUS_COMPLETED" , "TASK_STATUS_FAILED" ]:
return status
time.sleep(wait_time)
wait_time = min (wait_time * 2 , max_wait) # 最大60秒に制限
4. ステータスレスポンスをキャッシュ
from functools import lru_cache
import time
@lru_cache ( maxsize = 1000 )
def get_cached_status ( task_id : str , api_key : str , timestamp : int ):
"""5秒間ステータスをキャッシュします。"""
return get_task_status(task_id, api_key)
# 使用例
current_time = int (time.time() / 5 ) # 5秒ごとのバケット
status = get_cached_status( "task_abc123" , "sk_test_123456" , current_time)
5. メタデータを抽出
def extract_task_info ( task_id : str , api_key : str ):
"""有用なメタデータを抽出します。"""
status = get_task_status(task_id, api_key)
return {
"task_id" : status[ "task_id" ],
"query" : status[ "query" ],
"status" : status[ "status" ],
"mode" : status[ "mode" ],
"session_id" : status[ "session_id" ],
"has_result" : status[ "response" ] is not None ,
"has_error" : bool (status[ "error" ]),
"workflow_url" : f "http://localhost:8088/workflows/ { task_id } "
}
関連エンドポイント
Python SDK client.get_status()を使用
注意事項
本番環境でのポーリングは避けてください : 長時間実行されるタスクには、ポーリングの代わりにストリーミングエンドポイントを使用してください。ポーリングは不必要な負荷を生み出し、レイテンシを追加します。
セッショントラッキング : session_idフィールドを使用すると、タスクがどのセッションに属しているかを追跡できます。これは、マルチターンの会話やコストの帰属に役立ちます。