nat.plugins.mcp.client_impl#

Attributes#

Classes#

SessionData

Container for all session-related data.

MCPFunctionGroup

A specialized FunctionGroup for MCP clients that includes MCP-specific attributes

Functions#

mcp_session_tool_function(tool, function_group)

Create a session-aware NAT function for an MCP tool.

mcp_client_function_group(config, _builder)

Connect to an MCP server and expose tools as a function group.

mcp_apply_tool_alias_and_description(→ dict[str, ...)

Filter tool overrides to only include tools that exist in the MCP server.

Module Contents#

logger#
class SessionData#

Container for all session-related data.

client: nat.plugins.mcp.client_base.MCPBaseClient#
last_activity: datetime.datetime#
ref_count: int = 0#
lock: asyncio.Lock#
stop_event: asyncio.Event#
lifetime_task: asyncio.Task | None = None#
class MCPFunctionGroup(*args, **kwargs)#

Bases: nat.builder.function.FunctionGroup

A specialized FunctionGroup for MCP clients that includes MCP-specific attributes with session management.

Locking model (simple + safe; occasional ‘temporarily unavailable’ is acceptable).

RW semantics: - Multiple readers may hold the reader lock concurrently. - While any reader holds the lock, writers cannot proceed. - While the writer holds the lock, no new readers can proceed.

Data: - _sessions: dict[str, SessionData]; SessionData = {client, last_activity, ref_count, lock}.

Locks: - _session_rwlock (aiorwlock.RWLock) • Reader: very short sections — dict lookups, ref_count ++/–, touch last_activity. • Writer: structural changes — create session entries, enforce limits, remove on cleanup. - SessionData.lock (asyncio.Lock) • Protects per-session ref_count only, taken only while holding RW reader. • last_activity: written without session lock (timestamp races acceptable for cleanup heuristic).

Ordering & awaits: - Always acquire RWLock (reader/writer) before SessionData.lock; never the reverse. - Never await network I/O under the writer (client creation is the one intentional exception). - Client close happens after releasing the writer.

Cleanup: - Under writer: find inactive (ref_count == 0 and idle > max_age), pop from _sessions, stash clients. - After writer: await client.__aexit__() for each stashed client. - TOCTOU race: cleanup may read ref_count==0 then a usage increments it; accepted, yields None gracefully.

Invariants: - ref_count > 0 prevents cleanup. - Usage context increments ref_count before yielding and decrements on exit. - If a session disappears between ensure/use, callers return “Tool temporarily unavailable”.

Creates a new function group.

Parameters#

configFunctionGroupBaseConfig

The configuration for the function group.

instance_namestr | None, optional

The name of the function group. If not provided, the type of the function group will be used.

filter_fnCallable[[Sequence[str]], Awaitable[Sequence[str]]] | None, optional

A callback function to additionally filter the functions in the function group dynamically when the functions are accessed via any accessor method.

_mcp_client = None#
_mcp_client_server_name: str | None = None#
_mcp_client_transport: str | None = None#
_sessions: dict[str, SessionData]#
_session_rwlock#
_last_cleanup_check: datetime.datetime#
_cleanup_check_interval: datetime.timedelta#
_shared_auth_provider: nat.authentication.interfaces.AuthProviderBase | None = None#
_client_config: nat.plugins.mcp.client_config.MCPClientConfig | None = None#
_use_random_session_id_for_testing: bool = False#
property mcp_client#

Get the MCP client instance.

property mcp_client_server_name: str | None#

Get the MCP client server name.

property mcp_client_transport: str | None#

Get the MCP client transport type.

property session_count: int#

Current number of active sessions.

property session_limit: int#

Maximum allowed sessions.

_get_random_session_id() str#

Get a random session ID.

_get_session_id_from_context() str | None#

Get the session ID from the current context.

async cleanup_sessions(max_age: datetime.timedelta | None = None) int#

Manually trigger cleanup of inactive sessions.

Args:

max_age: Maximum age for sessions before cleanup. If None, uses configured timeout.

Returns:

Number of sessions cleaned up.

async _cleanup_inactive_sessions(max_age: datetime.timedelta | None = None)#

Remove clients for sessions inactive longer than max_age.

This method uses the RWLock writer to ensure thread-safe cleanup.

async _get_session_client(
session_id: str,
) nat.plugins.mcp.client_base.MCPBaseClient#

Get the appropriate MCP client for the session.

async _session_usage_context(session_id: str)#

Context manager to track active session usage and prevent cleanup.

async _create_session_client(
session_id: str,
) tuple[nat.plugins.mcp.client_base.MCPBaseClient, asyncio.Event, asyncio.Task]#

Create a new MCP client instance for the session.

mcp_session_tool_function(tool, function_group: MCPFunctionGroup)#

Create a session-aware NAT function for an MCP tool.

Routes each invocation to the appropriate per-session MCP client while preserving the original tool input schema, converters, and description.

async mcp_client_function_group(
config: nat.plugins.mcp.client_config.MCPClientConfig,
_builder: nat.builder.builder.Builder,
)#

Connect to an MCP server and expose tools as a function group.

Args:

config: The configuration for the MCP client _builder: The builder

Returns:

The function group

mcp_apply_tool_alias_and_description(
all_tools: dict,
tool_overrides: dict[str, nat.plugins.mcp.client_config.MCPToolOverrideConfig] | None,
) dict[str, nat.plugins.mcp.client_config.MCPToolOverrideConfig]#

Filter tool overrides to only include tools that exist in the MCP server.

Args:

all_tools: The tools from the MCP server tool_overrides: The tool overrides to apply

Returns:

Dictionary of valid tool overrides