メインコンテンツへスキップ

エンドポイント

GET http://localhost:8080/api/v1/tasks/{id}

説明

特定のタスクの現在のステータス、結果、およびメタデータを取得します。このエンドポイントを使用してタスクの進行状況を確認したり、最終結果を取得したりします。

認証

必要: はい ヘッダーにAPIキーを含めてください:
X-API-Key: sk_test_123456

リクエスト

パスパラメータ

パラメータタイプ必要説明
idstringはいタスクID(ワークフローIDとしても機能)

ヘッダー

ヘッダー必要説明
X-API-Keyはい認証キー

レスポンス

成功レスポンス

ステータス: 200 OK ヘッダー:
  • X-Workflow-ID: Temporalワークフロー識別子(タスクIDと同じ)
ボディ:
{
  "task_id": "string",
  "status": "string",
  "result": "string",
  "response": {},
  "metadata": {},
  "error": "string",
  "created_at": "timestamp",
  "updated_at": "timestamp",
  "query": "string",
  "session_id": "string",
  "mode": "string",
  "model_used": "string",
  "provider": "string",
  "usage": {
    "total_tokens": 0,
    "input_tokens": 0,
    "output_tokens": 0,
    "estimated_cost": 0.0
  }
}

レスポンスフィールド

フィールドタイプ説明
task_idstring一意のタスク識別子
statusstring現在のタスクステータス
resultstring生のLLM出力(プレーンテキストまたはJSON文字列)
responseobject解析されたJSON(resultが有効なJSONを含む場合のみ存在)
metadataobjectタスクメタデータ(引用、検証、unified_response、抽出された事実、モデルの内訳など)
errorstringエラーメッセージ(エラーがない場合は空)
created_attimestampタスク作成時間
updated_attimestamp最終更新時間
querystring元のタスククエリ
session_idstringセッション識別子
modestring使用された実行モード
model_usedstring使用された主要モデル(例: gpt-5-mini-2025-08-07
providerstringプロバイダー名(例: openai, anthropic
usageobjectトークン使用量とコスト: { total_tokens, input_tokens?, output_tokens?, estimated_cost? }

ステータス値

  • TASK_STATUS_UNSPECIFIED - ステータス不明
  • TASK_STATUS_QUEUED - 実行待ち
  • TASK_STATUS_RUNNING - 現在実行中
  • TASK_STATUS_COMPLETED - 成功裏に完了
  • TASK_STATUS_FAILED - エラーで失敗
  • TASK_STATUS_CANCELLED - ユーザーによってキャンセル
実行中のタスクをキャンセルするには、POST /api/v1/tasks/{id}/cancelも参照してください。
  • TASK_STATUS_TIMEOUT - タイムアウト制限を超えた

実行モード値

  • EXECUTION_MODE_SIMPLE - 単一のLLM呼び出し、ツールなし
  • EXECUTION_MODE_STANDARD - ツールを使用したマルチステップ
  • EXECUTION_MODE_COMPLEX - 高度な推論パターン

タスクステータスの確認

curl -X GET "http://localhost:8080/api/v1/tasks/task_01HQZX3Y9K8M2P4N5S7T9W2V" \
  -H "X-API-Key: sk_test_123456"
レスポンス(キュー中):
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "TASK_STATUS_QUEUED",
  "response": null,
  "error": "",
  "created_at": "2025-10-22T10:30:00Z",
  "updated_at": "2025-10-22T10:30:00Z",
  "query": "フランスの首都はどこですか?",
  "session_id": "user-123-session",
  "mode": "EXECUTION_MODE_SIMPLE"
}
レスポンス(実行中):
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "TASK_STATUS_RUNNING",
  "response": null,
  "error": "",
  "created_at": "2025-10-22T10:30:00Z",
  "updated_at": "2025-10-22T10:30:02Z",
  "query": "フランスの首都はどこですか?",
  "session_id": "user-123-session",
  "mode": "EXECUTION_MODE_SIMPLE"
}
レスポンス(完了):
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "TASK_STATUS_COMPLETED",
  "result": "フランスの首都はパリです。パリは987年から首都であり、国の北中央部に位置しています。",
  "error": "",
  "created_at": "2025-10-22T10:30:00Z",
  "updated_at": "2025-10-22T10:30:05Z",
  "query": "フランスの首都はどこですか?",
  "session_id": "user-123-session",
  "mode": "EXECUTION_MODE_SIMPLE",
  "model_used": "gpt-5-mini-2025-08-07",
  "provider": "openai",
  "usage": {
    "total_tokens": 300,
    "input_tokens": 200,
    "output_tokens": 100,
    "estimated_cost": 0.006
  }
}
レスポンス(失敗):
{
  "task_id": "task_01HQZX3Y9K8M2P4N5S7T9W2V",
  "status": "TASK_STATUS_FAILED",
  "response": null,
  "error": "LLMサービスが利用できません: 接続タイムアウト",
  "created_at": "2025-10-22T10:30:00Z",
  "updated_at": "2025-10-22T10:30:10Z",
  "query": "フランスの首都はどこですか?",
  "session_id": "user-123-session",
  "mode": "EXECUTION_MODE_SIMPLE"
}

ディープリサーチレスポンスペイロード

force_research: trueでタスクが提出されると、完了したレスポンスには構造化されたリサーチデータを含む追加のメタデータフィールドが含まれます。

ディープリサーチメタデータフィールド

ディープリサーチタスクの場合、metadataオブジェクトには以下が含まれます:
フィールドタイプ説明
unified_responseobjectすべてのメタデータを含む統合レスポンス
citationsarrayリサーチからの構造化された引用データ
verificationobject主張の検証結果(enable_verification: trueの場合)
extracted_factsarray構造化された事実(enable_fact_extraction: trueの場合)
fact_summaryobject抽出された事実の要約統計

例: 深い研究の完了レスポンス

{
  "task_id": "task-abc123-research",
  "status": "TASK_STATUS_COMPLETED",
  "result": "# 2025年のAIトレンド\n\n2025年の人工知能の風景は...[1][2]\n\n## 出典\n[1] MIT Technology Review (https://...) - 2025-01-15\n[2] Nature AI (https://...) - 2025-01-10",
  "metadata": {
    "unified_response": {
      "task_id": "task-abc123-research",
      "session_id": "session-xyz",
      "status": "completed",
      "result": "2025年の人工知能の風景は...",
      "metadata": {
        "model": "claude-sonnet-4-5-20250929",
        "execution_mode": "EXECUTION_MODE_COMPLEX",
        "complexity_score": 0.72,
        "agents_used": 3
      },
      "usage": {
        "input_tokens": 12100,
        "output_tokens": 3320,
        "total_tokens": 15420,
        "cost_usd": 0.0462
      },
      "performance": {
        "execution_time_ms": 45200
      },
      "stop_reason": "completed",
      "error": null,
      "timestamp": "2025-01-20T10:00:00Z"
    },
    "citations": [
      {
        "title": "AI Breakthrough Report 2025",
        "url": "https://www.technologyreview.com/2025/01/ai-report",
        "source": "MIT Technology Review",
        "credibility_score": 0.92,
        "quality_score": 0.88
      },
      {
        "title": "Nature AI Special Issue",
        "url": "https://www.nature.com/articles/ai-2025",
        "source": "Nature",
        "credibility_score": 0.95,
        "quality_score": 0.91
      }
    ],
    "verification": {
      "overall_confidence": 0.87,
      "total_claims": 12,
      "supported_claims": 10,
      "unsupported_claims": ["未来予測に関する主張"],
      "conflicts": [],
      "claim_details": [
        {
          "claim": "AIモデルはベンチマークXで95%の精度を達成した",
          "supporting_citations": [1, 2],
          "conflicting_citations": [],
          "confidence": 0.92
        }
      ]
    }
  },
  "model_used": "claude-sonnet-4-5-20250929",
  "provider": "anthropic",
  "usage": {
    "total_tokens": 15420,
    "input_tokens": 12100,
    "output_tokens": 3320,
    "estimated_cost": 0.0462
  }
}

抽出された事実 (オプション)

enable_fact_extraction: true がリクエストコンテキストに設定されている場合:
{
  "metadata": {
    "extracted_facts": [
      {
        "statement": "GPT-5はMMLUベンチマークで95%の精度を達成した",
        "confidence": 0.92,
        "source_citation": [1, 3],
        "category": "performance",
        "entity_mentions": ["GPT-5", "MMLU"],
        "temporal_marker": "2025",
        "is_quantitative": true
      }
    ],
    "fact_summary": {
      "total_facts": 24,
      "high_confidence": 18,
      "categorized_facts": {
        "performance": 8,
        "market": 6,
        "research": 10
      },
      "contradiction_count": 1
    }
  }
}

引用オブジェクトスキーマ

フィールド説明
urlstring出典のURL
titlestring記事/ページのタイトル
sourcestring出版社/ドメイン名
credibility_scorefloat出典の信頼性 (0.0-1.0)
quality_scorefloatコンテンツの質 (0.0-1.0)
metadata.citations 配列内の各引用の位置は、インライン参照で使用される [n] インデックスに対応しています。

検証オブジェクトスキーマ

フィールド説明
overall_confidencefloat集計された検証信頼度 (0.0-1.0)
total_claimsinteger抽出された事実の数
supported_claimsinteger支持する引用がある主張の数
unsupported_claimsarray引用支持がない主張のテキストリスト
conflictsarray検出された矛盾情報
claim_detailsarray各主張の検証詳細
深い研究データへのアクセス: metadata.citations 配列と metadata.verification オブジェクトは、研究ワークフロー (force_research: true) のみで populated されます。シンプルなタスクの場合、これらのフィールドはレスポンスに存在しません。

エラーレスポンス

401 Unauthorized

{
  "error": "Unauthorized"
}

404 Not Found

{
  "error": "Task not found"
}

500 Internal Server Error

{
  "error": "Failed to get task status: database error"
}

コード例

Python - シンプルなステータスチェック

import httpx

def get_task_status(task_id: str, api_key: str):
    """タスクのステータスを取得します。"""
    response = httpx.get(
        f"http://localhost:8080/api/v1/tasks/{task_id}",
        headers={"X-API-Key": api_key}
    )

    if response.status_code == 404:
        return None

    return response.json()

status = get_task_status("task_abc123", "sk_test_123456")
if status:
    print(f"ステータス: {status['status']}")
    if status['status'] == 'TASK_STATUS_COMPLETED':
        print(f"結果: {status['result']}")

Python - 完了までポーリング

import httpx
import time

def wait_for_completion(task_id: str, api_key: str, timeout: int = 300):
    """完了またはタイムアウトまでタスクのステータスをポーリングします。"""
    start_time = time.time()

    while True:
        response = httpx.get(
            f"http://localhost:8080/api/v1/tasks/{task_id}",
            headers={"X-API-Key": api_key}
        )

        status = response.json()
        current_status = status["status"]

        # 終端状態をチェック
        if current_status == "TASK_STATUS_COMPLETED":
            return status.get("result")
        elif current_status == "TASK_STATUS_FAILED":
            raise Exception(f"タスクが失敗しました: {status['error']}")
        elif current_status == "TASK_STATUS_TIMEOUT":
            raise Exception("タスクがタイムアウトしました")
        elif current_status == "TASK_STATUS_CANCELLED":
            raise Exception("タスクがキャンセルされました")

        # タイムアウトをチェック
        if time.time() - start_time > timeout:
            raise TimeoutError(f"{timeout}s後にポーリングタイムアウト")

        # 次のポーリングまで待機
        time.sleep(2)

# 使用例
try:
    result = wait_for_completion("task_abc123", "sk_test_123456")
    print("結果:", result)
except Exception as e:
    print("エラー:", e)

JavaScript/Node.js

const axios = require('axios');

async function getTaskStatus(taskId) {
  try {
    const response = await axios.get(
      `http://localhost:8080/api/v1/tasks/${taskId}`,
      {
        headers: {
          'X-API-Key': 'sk_test_123456'
        }
      }
    );

    const status = response.data;

    console.log('Status:', status.status);

    if (status.status === 'TASK_STATUS_COMPLETED') {
      console.log('Result:', status.result);
    } else if (status.status === 'TASK_STATUS_FAILED') {
      console.error('Error:', status.error);
    }

    return status;
  } catch (error) {
    if (error.response?.status === 404) {
      console.error('タスクが見つかりません');
    } else {
      console.error('エラー:', error.response?.data || error.message);
    }
    throw error;
  }
}

getTaskStatus('task_abc123');

JavaScript - Poll with Async/Await

async function waitForCompletion(taskId, timeout = 300000) {
  const startTime = Date.now();
  const pollInterval = 2000; // 2秒

  while (true) {
    const response = await axios.get(
      `http://localhost:8080/api/v1/tasks/${taskId}`,
      { headers: { 'X-API-Key': 'sk_test_123456' } }
    );

    const { status, result, error } = response.data;

    if (status === 'TASK_STATUS_COMPLETED') {
      return result;
    } else if (status === 'TASK_STATUS_FAILED') {
      throw new Error(`タスクが失敗しました: ${error}`);
    } else if (status === 'TASK_STATUS_TIMEOUT') {
      throw new Error('タスクがタイムアウトしました');
    }

    // タイムアウトを確認
    if (Date.now() - startTime > timeout) {
      throw new Error(`ポーリングタイムアウト: ${timeout}ms`);
    }

    // 次のポーリングまで待機
    await new Promise(resolve => setTimeout(resolve, pollInterval));
  }
}

// 使用例
waitForCompletion('task_abc123')
  .then(result => console.log('Result:', result))
  .catch(error => console.error('エラー:', error.message));

Go

package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

type TaskStatusResponse struct {
    TaskID    string                 `json:"task_id"`
    Status    string                 `json:"status"`
    Response  map[string]interface{} `json:"response"`
    Error     string                 `json:"error"`
    Query     string                 `json:"query"`
    SessionID string                 `json:"session_id"`
    Mode      string                 `json:"mode"`
}

func getTaskStatus(taskID string) (*TaskStatusResponse, error) {
    url := fmt.Sprintf("http://localhost:8080/api/v1/tasks/%s", taskID)

    req, _ := http.NewRequest("GET", url, nil)
    req.Header.Set("X-API-Key", "sk_test_123456")

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode == 404 {
        return nil, fmt.Errorf("タスクが見つかりません")
    }

    var status TaskStatusResponse
    json.NewDecoder(resp.Body).Decode(&status)

    return &status, nil
}

func waitForCompletion(taskID string, timeout time.Duration) (map[string]interface{}, error) {
    start := time.Now()

    for {
        status, err := getTaskStatus(taskID)
        if err != nil {
            return nil, err
        }

        switch status.Status {
        case "TASK_STATUS_COMPLETED":
            return status.Response, nil
        case "TASK_STATUS_FAILED":
            return nil, fmt.Errorf("タスクが失敗しました: %s", status.Error)
        case "TASK_STATUS_TIMEOUT":
            return nil, fmt.Errorf("タスクがタイムアウトしました")
        }

        if time.Since(start) > timeout {
            return nil, fmt.Errorf("ポーリングタイムアウト")
        }

        time.Sleep(2 * time.Second)
    }
}

func main() {
    result, err := waitForCompletion("task_abc123", 5*time.Minute)
    if err != nil {
        fmt.Println("エラー:", err)
        return
    }

    fmt.Println("Result:", result)
}

Bash - Monitor Task Progress

#!/bin/bash

API_KEY="sk_test_123456"
TASK_ID="$1"

if [ -z "$TASK_ID" ]; then
  echo "使用法: $0 <task_id>"
  exit 1
fi

echo "タスクを監視中: $TASK_ID"
echo ""

while true; do
  RESPONSE=$(curl -s "http://localhost:8080/api/v1/tasks/$TASK_ID" \
    -H "X-API-Key: $API_KEY")

  STATUS=$(echo $RESPONSE | jq -r '.status')

  echo "[$(date +%T)] ステータス: $STATUS"

  case $STATUS in
    "TASK_STATUS_COMPLETED")
      echo ""
      echo "✓ タスクが完了しました!"
      echo ""
      echo $RESPONSE | jq -r '.response.result'
      exit 0
      ;;
    "TASK_STATUS_FAILED")
      echo ""
      echo "✗ タスクが失敗しました!"
      ERROR=$(echo $RESPONSE | jq -r '.error')
      echo "エラー: $ERROR"
      exit 1
      ;;
    "TASK_STATUS_TIMEOUT")
      echo ""
      echo "✗ タスクがタイムアウトしました!"
      exit 1
      ;;
    "TASK_STATUS_CANCELLED")
      echo ""
      echo "✗ タスクがキャンセルされました!"
      exit 1
      ;;
  esac

  sleep 2
done

Use Cases

1. Submit and Wait Pattern

import httpx
import time

def submit_and_wait(query: str, api_key: str):
    """タスクを送信し、結果を待つ。"""
    # 送信
    submit_response = httpx.post(
        "http://localhost:8080/api/v1/tasks",
        headers={"X-API-Key": api_key},
        json={"query": query}
    )
    task_id = submit_response.json()["task_id"]
    print(f"タスクが送信されました: {task_id}")

    # 待機
    while True:
        status_response = httpx.get(
            f"http://localhost:8080/api/v1/tasks/{task_id}",
            headers={"X-API-Key": api_key}
        )
        status = status_response.json()

        if status["status"] == "TASK_STATUS_COMPLETED":
            return status["response"]["result"]
        elif status["status"] == "TASK_STATUS_FAILED":
            raise Exception(status["error"])

        time.sleep(2)

result = submit_and_wait("Pythonとは何ですか?", "sk_test_123456")
print(result)

2. ダッシュボードステータスウィジェット

def get_task_summary(task_id: str, api_key: str):
    """ダッシュボードのタスク概要を取得します。"""
    response = httpx.get(
        f"http://localhost:8080/api/v1/tasks/{task_id}",
        headers={"X-API-Key": api_key}
    )

    status = response.json()

    return {
        "id": task_id,
        "query": status["query"][:50] + "...",
        "status": status["status"].replace("TASK_STATUS_", ""),
        "mode": status["mode"].replace("EXECUTION_MODE_", ""),
        "created": status["created_at"]
    }

# UIに表示
summary = get_task_summary("task_abc123", "sk_test_123456")
print(f"{summary['status']}: {summary['query']}")

3. バッチステータスチェック

def check_multiple_tasks(task_ids: list, api_key: str):
    """複数のタスクのステータスをチェックします。"""
    results = {}

    for task_id in task_ids:
        try:
            response = httpx.get(
                f"http://localhost:8080/api/v1/tasks/{task_id}",
                headers={"X-API-Key": api_key},
                timeout=5.0
            )
            results[task_id] = response.json()["status"]
        except Exception as e:
            results[task_id] = f"ERROR: {e}"

    return results

# 5つのタスクをチェック
task_ids = ["task_1", "task_2", "task_3", "task_4", "task_5"]
statuses = check_multiple_tasks(task_ids, "sk_test_123456")

for task_id, status in statuses.items():
    print(f"{task_id}: {status}")

ベストプラクティス

1. ポーリングの代わりにストリーミングを使用

長時間実行されるタスクには、ポーリングの代わりにSSEストリーミングを使用します:
# ❌ 悪い - 2秒ごとにポーリング
while True:
    status = httpx.get(f".../{task_id}")
    if status["status"] == "COMPLETED":
        break
    time.sleep(2)

# ✅ 良い - ストリーミングを使用
for event in client.stream(task_id):
    print(event.type, event.message)
    if event.type == "TASK_COMPLETED":
        break

2. すべてのステータス状態を処理

status = get_task_status(task_id, api_key)

match status["status"]:
    case "TASK_STATUS_QUEUED":
        print("タスクはキューに入っています...")
    case "TASK_STATUS_RUNNING":
        print("タスクは実行中です...")
    case "TASK_STATUS_COMPLETED":
        result = status["response"]["result"]
        print(f"結果: {result}")
    case "TASK_STATUS_FAILED":
        print(f"失敗: {status['error']}")
    case "TASK_STATUS_TIMEOUT":
        print("タスクがタイムアウトしました")
    case "TASK_STATUS_CANCELLED":
        print("タスクはキャンセルされました")

3. 指数バックオフを実装

import time

def poll_with_backoff(task_id, api_key, max_wait=60):
    """指数バックオフでポーリングします。"""
    wait_time = 1

    while True:
        status = get_task_status(task_id, api_key)

        if status["status"] in ["TASK_STATUS_COMPLETED", "TASK_STATUS_FAILED"]:
            return status

        time.sleep(wait_time)
        wait_time = min(wait_time * 2, max_wait)  # 最大60秒に制限

4. ステータスレスポンスをキャッシュ

from functools import lru_cache
import time

@lru_cache(maxsize=1000)
def get_cached_status(task_id: str, api_key: str, timestamp: int):
    """5秒間ステータスをキャッシュします。"""
    return get_task_status(task_id, api_key)

# 使用例
current_time = int(time.time() / 5)  # 5秒ごとのバケット
status = get_cached_status("task_abc123", "sk_test_123456", current_time)

5. メタデータを抽出

def extract_task_info(task_id: str, api_key: str):
    """有用なメタデータを抽出します。"""
    status = get_task_status(task_id, api_key)

    return {
        "task_id": status["task_id"],
        "query": status["query"],
        "status": status["status"],
        "mode": status["mode"],
        "session_id": status["session_id"],
        "has_result": status["response"] is not None,
        "has_error": bool(status["error"]),
        "workflow_url": f"http://localhost:8088/workflows/{task_id}"
    }

関連エンドポイント

注意事項

本番環境でのポーリングは避けてください: 長時間実行されるタスクには、ポーリングの代わりにストリーミングエンドポイントを使用してください。ポーリングは不必要な負荷を生み出し、レイテンシを追加します。
セッショントラッキング: session_idフィールドを使用すると、タスクがどのセッションに属しているかを追跡できます。これは、マルチターンの会話やコストの帰属に役立ちます。