Agentic AI Server: ReActアーキテクチャと質問応答フロー
本ドキュメントでは、Agentic AI Serverの内部推論ロジックを解説する。
概要
Agentic AI Serverは、LangGraphの ReAct (Reasoning + Acting) パターンを用いたAIエージェントサーバーである。 ユーザーの質問に対し、LLM(Claude)が自律的にツールを呼び出しながら推論を繰り返し、最終回答を生成する。
キーコンポーネント
| コンポーネント | ファイル | 役割 |
|---|---|---|
| FastAPI アプリ | server/app.py |
エンドポイント、エージェント管理、ストリーミング |
| ReActエージェント | server/app.py (create_react_agent) |
ツール呼び出しを含む推論ループ |
| Plannerグラフ | server/planner_graph.py |
複雑タスクの分解・実行・評価(多段思考) |
| プロンプト群 | server/prompts.py |
全LLMプロンプトの一元管理 |
| アナリティクス | server/analytics.py |
利用ログ(CloudWatch + S3) |
| MCP設定 | .mcp_servers |
外部ツールサーバー接続定義 |
アーキテクチャ概観
%%{init: {'theme': 'default', 'themeVariables': {'fontSize': '11px'}, 'flowchart': {'nodeSpacing': 30, 'rankSpacing': 40, 'curve': 'basis'}}}%%
flowchart LR
OpenWebUI["Open WebUI"]
Endpoint["POST /v1/chat/completions<br/>POST /api/chat"]
LLM["Claude API 呼び出し<br/>(ReActループ)"]
ToolExec["MCPツール実行"]
Memory["会話メモリ<br/>MemorySaver + サマリー"]
MCP1["met-data"]
MCP2["mcp-doc-srch"]
OpenWebUI -->|"OpenAI互換API"| Endpoint
Endpoint --> LLM
LLM -->|"tool_use"| ToolExec
ToolExec -->|"streamable_http"| MCP1
ToolExec -->|"streamable_http"| MCP2
ToolExec -->|"ToolMessage"| LLM
LLM --> Memory
style Endpoint fill:#fff3e0,stroke:#e65100,stroke-width:2px
style LLM fill:#fce4ec,stroke:#c2185b,stroke-width:2px
style ToolExec fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Memory fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
二層アーキテクチャの設計思想
エージェントの推論は内部ループと外部ループの二層で構成される。
| 層 | 実装 | 役割 |
|---|---|---|
| 内部ループ | LangGraph create_react_agent |
単一の質問に対してツール呼び出しと推論を自律的に繰り返す |
| 外部ループ | LangGraph StateGraph(Planner) |
複雑なタスクを1〜5ステップに事前分解し、ステップごとに内部ループを呼び出す |
エンドポイントとループの対応
| エンドポイント | メソッド | 使用ループ | 用途 |
|---|---|---|---|
/v1/chat/completions |
POST | 内部のみ | Open WebUI経由の通常Q&A(メイン) |
/api/chat |
POST | 内部のみ | 非ストリーミング(テスト用) |
/api/chat/stream |
GET | 内部のみ | deprecated |
/api/planner/stream |
GET | 外部 + 内部 | deprecated |
Open WebUI からのリクエストはすべて /v1/chat/completions → 内部ループで処理される。
内部ループ: LangGraph ReActエージェント
ReActパターンとは
ReAct = Reasoning + Acting。LLMが以下のサイクルを自律的に繰り返すパターンである。
- Thought(思考) — ユーザーの質問とコンテキストから、どう回答すべきか推論する
- Action(行動) — 情報が必要であればツールを呼び出す(Claude APIの
tool_use機能) - Observation(観察) — ツール実行結果を受け取り、次の判断材料にする
- 必要な情報が揃ったら最終回答を生成して終了
LangGraphの create_react_agent がこのループを自動管理する。開発者はループ制御コードを書く必要がない。
%%{init: {'theme': 'default', 'themeVariables': {'fontSize': '11px'}}}%%
stateDiagram-v2
[*] --> AgentNode: ユーザーメッセージ入力
AgentNode --> ToolsNode: tool_use を返した場合
AgentNode --> [*]: テキスト応答のみ(終了)
ToolsNode --> AgentNode: ToolMessage を追加
note right of AgentNode
Claude API呼び出し
システムプロンプト + 会話履歴 + ツール定義
end note
note right of ToolsNode
MCPサーバーへのHTTP呼び出し
結果をToolMessageとして返す
end note
エージェント生成
server/app.py の get_or_create_agent() がエージェントのライフサイクルを管理する。
# app.py L580-585
agent = create_react_agent(
model=llm, # ChatAnthropic(model=MODEL, temperature=0.2, max_tokens=2000)
tools=filtered_tools, # MCPサーバーから取得したツール群
prompt=prompt, # create_agent_prompt() で動的生成(日付+ツール情報を注入)
checkpointer=memory # MemorySaver(インメモリ会話チェックポイント)
)
LLM設定
| パラメータ | 値 | 理由 |
|---|---|---|
model |
環境変数 ANTHROPIC_MODEL |
デフォルト: claude-sonnet-4-5 |
temperature |
0.2 | 安定した回答を優先(ツール呼び出し判断の一貫性) |
max_tokens |
2000 | 十分な長さの回答を許容 |
timeout |
30.0秒 | API呼び出しのタイムアウト |
max_retries |
2 | 自動リトライ |
ReActループの詳細フロー
%%{init: {'theme': 'default', 'themeVariables': {'fontSize': '11px'}, 'sequence': {'actorMargin': 40, 'mirrorActors': false}}}%%
sequenceDiagram
participant App as app.py
participant Agent as LangGraph Agent
participant Claude as Claude API
participant MCP as MCPサーバー
App->>Agent: agent.astream_events(messages, config)
Agent->>Claude: messages + tools定義
alt ツール呼び出しが必要
Claude-->>Agent: AIMessage(tool_calls=[...])
Agent->>MCP: ツール実行 (streamable_http)
MCP-->>Agent: ToolMessage(content=結果)
Agent->>Claude: messages + ToolMessage を追加
Note over Agent,Claude: ツールが不要になるまで繰り返し
Claude-->>Agent: AIMessage(content=最終回答)
else ツール不要
Claude-->>Agent: AIMessage(content=直接回答)
end
Agent-->>App: on_chat_model_stream イベント(テキスト差分)
Agent-->>App: on_tool_start / on_tool_end イベント
終了条件
| 条件 | トリガー | 挙動 |
|---|---|---|
| 正常終了 | Claudeが tool_use なしのテキスト応答を返す |
ループ終了、最終回答をストリーミング返却 |
| 強制終了 | recursion_limit=50 を超過 |
RecursionError 例外、エラーレスポンス返却 |
外部ループ: Plannerグラフ(多段思考) — deprecated
Note:
/api/planner/streamエンドポイントは現在 deprecated であり、本番では使用されていない。設計の参考として記載する。
設計思想
複雑なタスク(例: 「東京と大阪の天気を比較して」)を自動的にステップに分解し、各ステップを内部ReActエージェントで実行する。実行結果を評価し、必要に応じてリトライや再計画を行う。
データモデル
# planner_graph.py L20-37
class Step(BaseModel):
id: int
instruction: str # このステップの具体的な指示
expected: str # 期待される結果(Evaluatorが参照)
status: str # pending / executing / completed / failed
result: str # 実行結果
class GraphState(TypedDict):
messages: list # LangChainメッセージリスト
plan: List[Step] # 実行計画
step_idx: int # 現在実行中のステップインデックス
last_observation: str # 直近の実行結果
attempts: int # 現在ステップの試行回数(最大: max_attempts=2)
max_attempts: int # ステップの最大試行回数
success_criteria: str # 全体の成功条件(Plannerが設定)
total_attempts: int # 通算試行回数(上限: 10)
need_replan: bool # 再計画フラグ
Plan → Execute → Evaluate フロー
%%{init: {'theme': 'default', 'themeVariables': {'fontSize': '11px'}, 'flowchart': {'nodeSpacing': 30, 'rankSpacing': 40, 'curve': 'basis'}}}%%
flowchart TD
Start([ユーザーリクエスト]) --> P1
P1["1. Plan: Claude APIでタスク分解<br/>(PLANNER_PROMPT → JSON → Steps)"]
E1["2. Execute: ReActエージェントで<br/>step.instruction を実行"]
EV1{"3. Evaluate: 成功判定"}
P1 --> E1
E1 --> EV1
EV1 -->|"failed or Error"| Fail["passed=false"]
EV1 -->|"completed かつ<br/>データ量 > 50文字"| Pass["passed=true<br/>(LLMスキップ)"]
EV1 -->|"不明"| EV3["LLMで評価<br/>(EVALUATOR_PROMPT)"]
Fail --> R1
Pass --> R1
EV3 --> R1
R1{"total_attempts > 10 ?"}
R1 -->|"Yes"| End([完了: 強制終了])
R1 -->|"No"| R2{"need_replan ?"}
R2 -->|"Yes"| P1
R2 -->|"No"| R3{"次のステップあり ?"}
R3 -->|"Yes"| E1
R3 -->|"No"| End2([完了: 全ステップ成功])
style P1 fill:#e1f5fe,stroke:#01579b,stroke-width:2px
style E1 fill:#fff3e0,stroke:#e65100,stroke-width:2px
style EV1 fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
評価ロジックの詳細
Evaluator は3パターンの判定を行う。
| 判定 | 条件 | LLM呼び出し | 結果 |
|---|---|---|---|
| 自動失敗 | step.status == "failed" or observation が "Error:" で始まる |
なし | passed=false |
| 自動成功 | step.status == "completed" かつ len(observation) > 50 |
なし | passed=true |
| LLM評価 | 上記以外の不明状態 | あり (EVALUATOR_PROMPT) |
LLMが {passed, reason, need_replan} を返す |
LLM評価を可能な限りスキップすることで、コストとレイテンシを削減している。
再計画トリガー
- ステップの試行回数が
max_attempts(デフォルト2)を超えた場合 - Evaluator が
need_replan=Trueを返した場合 - 再計画時は
last_observation(失敗理由)を含めて Planner に渡し、異なるアプローチを促す
Q&Aフロー: リクエストからレスポンスまで
/v1/chat/completions の処理フロー
Open WebUI からの通常チャットはすべてこのエンドポイントで処理される。
%%{init: {'theme': 'default', 'themeVariables': {'fontSize': '11px'}, 'sequence': {'actorMargin': 40, 'mirrorActors': false}}}%%
sequenceDiagram
participant WebUI as Open WebUI
participant App as app.py
participant Summary as 会話サマリー
participant Agent as LangGraph Agent
participant Claude as Claude API
participant MCP as MCPサーバー
participant Analytics as analytics.py
WebUI->>App: POST /v1/chat/completions<br/>{model, messages, stream:true}
Note over App: 1. スレッドID抽出<br/>(x-openwebui-chat-id ヘッダー)
App->>Summary: _get_conversation_summary(thread_id)
Summary-->>App: 既存サマリー(あれば)
Note over App: 2. agent_messages 構築<br/>① SystemMsg(カスタムプロンプト)<br/>② SystemMsg(会話サマリー)<br/>③ 全会話履歴
App->>Agent: astream_events(agent_messages,<br/>thread_id=session_id,<br/>recursion_limit=50)
loop ReActループ
Agent->>Claude: messages + tools定義
Claude-->>Agent: tool_use
Agent->>MCP: ツール実行
MCP-->>Agent: 結果
end
Claude-->>Agent: 最終テキスト応答
loop 50文字ごとにチャンク分割
Agent-->>App: on_chat_model_stream
App-->>WebUI: SSE chunk (OpenAI互換形式)
end
App-->>WebUI: data: [DONE]
Note over App: 3. 後処理 (fire-and-forget)
App->>Summary: _update_conversation_summary()
App->>Analytics: emit() → CloudWatch + S3
メッセージ構築の順序
agent_messages は以下の順序で構築される。
agent_messages = []
# ① Open WebUI のモデル設定に含まれるカスタムシステムプロンプト
if custom_system_prompt:
agent_messages.append(SystemMessage(content=custom_system_prompt))
# ② 過去の会話サマリー(クロスターン記憶)
summary = await _get_conversation_summary(thread_id)
if summary:
agent_messages.append(SystemMessage(content=f"[過去の会話の要約]\n{summary}"))
# ③ 今回リクエストの会話履歴(system メッセージは除外済み)
agent_messages.extend(conversation_messages) # HumanMessage + AIMessage
thread_id と session_id の違い
| ID | 取得元 | ライフサイクル | 用途 |
|---|---|---|---|
thread_id |
ヘッダー x-openwebui-chat-id |
Open WebUIのチャット単位(複数ターンにまたがる) | 会話サマリーの識別キー |
session_id |
毎リクエスト生成UUID | 1リクエスト = 1セッション | MemorySaver のキー(実質的に単一ターン) |
ストリーミング実装
agent.astream_events(..., version="v2") で得られるイベントからテキスト差分を抽出し、OpenAI互換SSE形式で送信する。
| 実装項目 | 値 | 理由 |
|---|---|---|
| チャンク分割 | 50文字ごと | ALB/Nginxのバッファリング対策 |
| keepalive | 5秒間隔で空チャンク送信 | ALBアイドルタイムアウト(60秒)対策 |
| ヘッダー | X-Accel-Buffering: no |
Nginxのレスポンスバッファリング無効化 |
| イベントID | id: {連番} |
チャンクの順序保証 |
主要イベントタイプ
| イベント | 意味 | app.py での処理 |
|---|---|---|
on_chat_model_stream |
LLMトークン生成 | _extract_text_delta() でテキスト差分を抽出 → SSEチャンク送信 |
on_tool_start |
ツール実行開始 | ToolCallTracker にツール名を記録 |
on_tool_end |
ツール実行完了 | (ログ記録) |
_extract_text_delta() はLangGraph/LangChainのバージョン差異を吸収するユーティリティで、AIMessageChunk, dict, BaseMessage など様々な形式のイベントからテキスト差分を統一的に抽出する。
メモリ・会話継続システム
二層メモリ構造
%%{init: {'theme': 'default', 'themeVariables': {'fontSize': '11px'}, 'flowchart': {'nodeSpacing': 30, 'rankSpacing': 40, 'curve': 'basis'}}}%%
flowchart TB
CS["Layer 2: LLM生成サマリー<br/>_conversation_summaries<br/>thread_id → summary 300文字以内"]
Inject["SystemMessage として<br/>agent_messages に注入"]
Agent["ReActエージェント"]
MS["Layer 1: MemorySaver<br/>session_id をキーに<br/>ループ内メッセージ履歴を保持"]
Update["サマリー更新<br/>fire-and-forget"]
CS -->|"リクエスト時に取得"| Inject
Inject -->|"agent_messages<br/>先頭付近に追加"| Agent
Agent -->|"ループ内の状態保持"| MS
Agent -->|"レスポンス完了後"| Update
Update -->|"LLMで要約生成"| CS
style CS fill:#e1f5fe,stroke:#01579b,stroke-width:2px
style MS fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style Agent fill:#fff3e0,stroke:#e65100,stroke-width:2px
Layer 1: MemorySaver
- LangGraph のチェックポインタ。同一
session_idでの呼び出し間でグラフ状態(messages リスト)をインメモリに保持する - Open WebUI 経由では各リクエストで新しい
session_idが生成されるため、MemorySaver の継続効果は同一リクエスト内のReActループに限られる - 複数ターンにまたがる記憶はサマリー(Layer 2)が担う
Layer 2: LLM生成サマリー
各レスポンス完了後、asyncio.create_task() で非同期にサマリーを更新する(fire-and-forget)。
| 設定 | 値 | 環境変数 |
|---|---|---|
| 有効/無効 | デフォルト有効 | CONVERSATION_SUMMARY_ENABLED |
| 最大長 | 300文字 | プロンプトで制御 |
| キャッシュ上限 | 1000エントリ(LRU削除) | CONVERSATION_SUMMARY_MAX_ENTRIES |
| 使用モデル | メインモデルと同一 | SUMMARY_MODEL |
サマリー更新フロー:
既存サマリー(なければ「(なし)」)
+ 直近のユーザーメッセージ(先頭1000文字)
+ 直近のアシスタント応答(先頭2000文字)
↓ Claude API (temperature=0, max_tokens=500)
新しいサマリー(300文字以内)
↓ _conversation_summaries[thread_id] に上書き保存
MCPツール統合
ツール取得と登録
ツールはこのリポジトリ内では定義されていない。外部MCPサーバーから動的に取得する。
%%{init: {'theme': 'default', 'themeVariables': {'fontSize': '11px'}, 'sequence': {'actorMargin': 40, 'mirrorActors': false}}}%%
sequenceDiagram
participant App as app.py
participant Cognito
participant MCPClient as MultiServerMCPClient
participant MCP1 as met-data
participant MCP2 as mcp-doc-srch
Note over App: アプリ起動 (lifespan)
App->>Cognito: POST /oauth2/token<br/>(client_credentials)
Cognito-->>App: access_token
App->>MCPClient: MultiServerMCPClient(config)
MCPClient->>MCP1: tools/list
MCP1-->>MCPClient: ToolDef[]
MCPClient->>MCP2: tools/list<br/>Authorization: Bearer {token}
MCP2-->>MCPClient: ToolDef[]
MCPClient-->>App: tools: List[BaseTool]
Note over App: register_tools(tools)<br/>→ _tool_registry<br/>→ _tool_descriptions
App->>App: create_react_agent(llm, tools, prompt, memory)
MCP設定ファイル (.mcp_servers)
{
"httpStreamableServers": {
"met-data": {
"url": "http://127.0.0.1:13388/mcp"
},
"mcp-doc-srch": {
"url": "http://127.0.0.1:13387/mcp",
"requiresAuth": true
}
}
}
httpStreamableServers: HTTP Streamable Transport で接続するMCPサーバー群requiresAuth: trueのサーバーにはM2MトークンがAuthorization: Bearer {token}ヘッダーとして自動付加される
ツールレジストリ
3つのグローバル変数でツール状態を管理する。
| 変数 | 型 | 役割 |
|---|---|---|
_tool_registry |
Dict[str, Dict] |
ツール名 → {name, description, enabled, server, tool_object} |
_all_tools |
list |
全ツールオブジェクトのリスト |
_tool_descriptions |
list[str] |
"- {name}: {description}" 形式の文字列リスト(プロンプト注入用) |
セッション別ツールフィルタリング
POST /api/tools/settings でセッション単位でツールの有効/無効を切り替えられる。設定変更時は専用のエージェントインスタンスがキャッシュされる。
M2Mトークン管理
- OAuth 2.0 Client Credentials Grant で取得
_m2m_token_cacheにキャッシュし、有効期限の5分前にリフレッシュ- トークン期限切れ時はエージェントキャッシュも全破棄して再作成
プロンプトシステム
すべてのプロンプトは server/prompts.py に集約されている。
プロンプト一覧
| 定数 | 使用箇所 | 役割 |
|---|---|---|
AGENT_PROMPT_WITH_TOOLS |
create_agent_prompt() → エージェント生成 |
ツール情報・引用ルール・URL生成禁止ルールを含むメインプロンプト |
AGENT_PROMPT_FALLBACK |
create_agent_prompt() |
ツールなし時のフォールバック |
PLANNER_PROMPT |
planner_node() |
タスクをステップに分解。最小ステップ数を強制 |
EVALUATOR_PROMPT |
evaluator_node() |
ステップ結果の合否判定(LLM評価時のみ使用) |
REASONING_SUMMARY_PROMPT |
/api/chat 後処理 |
ツール呼び出しの高レベル要約(3項目以内) |
CONVERSATION_SUMMARY_PROMPT |
_update_conversation_summary() |
会話サマリーの更新(app.py 内で定義) |
AGENT_PROMPT_WITH_TOOLS の設計意図
Today is {current_date}.
You are a helpful assistant with access to the following tools:
{tools_info}
...
| 要素 | 目的 |
|---|---|
{current_date} 注入 |
エージェントに「今日の日付」を認識させる |
{tools_info} 注入 |
ツール名と説明を列挙し、Claudeが自律的にツールを選択できるようにする |
| JSON出力優先ルール | ツールの format パラメータがある場合に常に json を指定させる |
| URL生成禁止ルール | ハルシネーションURLを防止。ツール結果の url フィールドのみ使用可能 |
| 引用ルール | [1] 形式の番号引用と 参考文献: セクションを強制 |
| 言語指示 | クエリと同じ言語で回答させる |
エラーハンドリングとリトライ
| エラー | 原因 | 対処 |
|---|---|---|
| Anthropic 529 (Overloaded) | Claude API の過負荷 | 最大3回リトライ(指数バックオフ) |
asyncio.TimeoutError |
API呼び出しタイムアウト | 最大3回リトライ → 504返却 |
RecursionError |
ReActループが recursion_limit=50 を超過 |
エラーメッセージ返却 |
| MCPサーバー接続失敗 | MCPサーバー未起動・ネットワーク不通 | ツールなしで続行(グレースフルフォールバック) |
| M2Mトークン取得失敗 | Cognito設定ミス・ネットワーク不通 | MCPサーバーへの認証なしでリクエスト(失敗する可能性あり) |
| ストリーミング中エラー | ReActループ中の例外 | エラーテキストをSSEチャンクに挿入して [DONE] で閉じる |
| Planner内エラー | Executor/Evaluator の例外 | type: "error" SSEイベントで送信、ループ継続を試行 |