nat.plugins.mcp.client_impl#
Attributes#
Classes#
Container for all session-related data. |
|
A specialized FunctionGroup for MCP clients that includes MCP-specific attributes |
Functions#
|
Create a session-aware NAT function for an MCP tool. |
|
Connect to an MCP server and expose tools as a function group. |
|
Filter tool overrides to only include tools that exist in the MCP server. |
Module Contents#
- logger#
- class SessionData#
Container for all session-related data.
- last_activity: datetime.datetime#
- lock: asyncio.Lock#
- stop_event: asyncio.Event#
- lifetime_task: asyncio.Task | None = None#
- class MCPFunctionGroup(*args, **kwargs)#
Bases:
nat.builder.function.FunctionGroupA specialized FunctionGroup for MCP clients that includes MCP-specific attributes with session management.
Locking model (simple + safe; occasional ‘temporarily unavailable’ is acceptable).
RW semantics: - Multiple readers may hold the reader lock concurrently. - While any reader holds the lock, writers cannot proceed. - While the writer holds the lock, no new readers can proceed.
Data: - _sessions: dict[str, SessionData]; SessionData = {client, last_activity, ref_count, lock}.
Locks: - _session_rwlock (aiorwlock.RWLock) • Reader: very short sections — dict lookups, ref_count ++/–, touch last_activity. • Writer: structural changes — create session entries, enforce limits, remove on cleanup. - SessionData.lock (asyncio.Lock) • Protects per-session ref_count only, taken only while holding RW reader. • last_activity: written without session lock (timestamp races acceptable for cleanup heuristic).
Ordering & awaits: - Always acquire RWLock (reader/writer) before SessionData.lock; never the reverse. - Never await network I/O under the writer (client creation is the one intentional exception). - Client close happens after releasing the writer.
Cleanup: - Under writer: find inactive (ref_count == 0 and idle > max_age), pop from _sessions, stash clients. - After writer: await client.__aexit__() for each stashed client. - TOCTOU race: cleanup may read ref_count==0 then a usage increments it; accepted, yields None gracefully.
Invariants: - ref_count > 0 prevents cleanup. - Usage context increments ref_count before yielding and decrements on exit. - If a session disappears between ensure/use, callers return “Tool temporarily unavailable”.
Creates a new function group.
Parameters#
- configFunctionGroupBaseConfig
The configuration for the function group.
- instance_namestr | None, optional
The name of the function group. If not provided, the type of the function group will be used.
- filter_fnCallable[[Sequence[str]], Awaitable[Sequence[str]]] | None, optional
A callback function to additionally filter the functions in the function group dynamically when the functions are accessed via any accessor method.
- _mcp_client = None#
- _sessions: dict[str, SessionData]#
- _session_rwlock#
- _last_cleanup_check: datetime.datetime#
- _cleanup_check_interval: datetime.timedelta#
- _client_config: nat.plugins.mcp.client_config.MCPClientConfig | None = None#
- property mcp_client#
Get the MCP client instance.
- async cleanup_sessions(max_age: datetime.timedelta | None = None) int#
Manually trigger cleanup of inactive sessions.
- Args:
max_age: Maximum age for sessions before cleanup. If None, uses configured timeout.
- Returns:
Number of sessions cleaned up.
- async _cleanup_inactive_sessions(max_age: datetime.timedelta | None = None)#
Remove clients for sessions inactive longer than max_age.
This method uses the RWLock writer to ensure thread-safe cleanup.
- async _get_session_client(
- session_id: str,
Get the appropriate MCP client for the session.
- async _session_usage_context(session_id: str)#
Context manager to track active session usage and prevent cleanup.
- async _create_session_client(
- session_id: str,
Create a new MCP client instance for the session.
- mcp_session_tool_function(tool, function_group: MCPFunctionGroup)#
Create a session-aware NAT function for an MCP tool.
Routes each invocation to the appropriate per-session MCP client while preserving the original tool input schema, converters, and description.
- async mcp_client_function_group(
- config: nat.plugins.mcp.client_config.MCPClientConfig,
- _builder: nat.builder.builder.Builder,
Connect to an MCP server and expose tools as a function group.
- Args:
config: The configuration for the MCP client _builder: The builder
- Returns:
The function group
- mcp_apply_tool_alias_and_description(
- all_tools: dict,
- tool_overrides: dict[str, nat.plugins.mcp.client_config.MCPToolOverrideConfig] | None,
Filter tool overrides to only include tools that exist in the MCP server.
- Args:
all_tools: The tools from the MCP server tool_overrides: The tool overrides to apply
- Returns:
Dictionary of valid tool overrides