nat.plugins.mcp.client_base#

Attributes#

Classes#

AuthAdapter

httpx.Auth adapter for authentication providers.

MCPBaseClient

Base client for creating a MCP transport session and connecting to an MCP server

MCPSSEClient

Client for creating a session and connecting to an MCP server using SSE

MCPStdioClient

Client for creating a session and connecting to an MCP server using stdio.

MCPStreamableHTTPClient

Client for creating a session and connecting to an MCP server using streamable-http

MCPToolClient

Client wrapper used to call an MCP tool. This assumes that the MCP transport session

Module Contents#

logger#
class AuthAdapter(
auth_provider: nat.authentication.interfaces.AuthProviderBase,
user_id: str | None = None,
)#

Bases: httpx.Auth

httpx.Auth adapter for authentication providers. Converts AuthProviderBase to httpx.Auth interface for dynamic token management.

auth_provider#
user_id = None#
_lock#
is_authenticating = False#
async async_auth_flow(
request: httpx.Request,
) collections.abc.AsyncGenerator[httpx.Request, httpx.Response]#

Add authentication headers to the request using NAT auth provider.

async _get_auth_headers(
request: httpx.Request | None = None,
response: httpx.Response | None = None,
) dict[str, str]#

Get authentication headers from the NAT auth provider.

class MCPBaseClient(
transport: str = 'streamable-http',
auth_provider: nat.authentication.interfaces.AuthProviderBase | None = None,
user_id: str | None = None,
tool_call_timeout: datetime.timedelta = timedelta(seconds=60),
auth_flow_timeout: datetime.timedelta = timedelta(seconds=300),
reconnect_enabled: bool = True,
reconnect_max_attempts: int = 2,
reconnect_initial_backoff: float = 0.5,
reconnect_max_backoff: float = 50.0,
)#

Bases: abc.ABC

Base client for creating a MCP transport session and connecting to an MCP server

Args:

transport (str): The type of client to use (‘sse’, ‘stdio’, or ‘streamable-http’) auth_provider (AuthProviderBase | None): Optional authentication provider for Bearer token injection tool_call_timeout (timedelta): Timeout for tool calls when authentication is not required auth_flow_timeout (timedelta): Extended timeout for tool calls that may require interactive authentication reconnect_enabled (bool): Whether to automatically reconnect on connection failures reconnect_max_attempts (int): Maximum number of reconnection attempts reconnect_initial_backoff (float): Initial backoff delay in seconds for reconnection attempts reconnect_max_backoff (float): Maximum backoff delay in seconds for reconnection attempts

_tools = None#
_transport = ''#
_exit_stack: contextlib.AsyncExitStack | None = None#
_session: mcp.ClientSession | None = None#
_connection_established = False#
_initial_connection = False#
_auth_provider = None#
_httpx_auth = None#
_tool_call_timeout#
_auth_flow_timeout#
_reconnect_enabled = True#
_reconnect_max_attempts = 2#
_reconnect_initial_backoff = 0.5#
_reconnect_max_backoff = 50.0#
_reconnect_lock: asyncio.Lock#
property auth_provider: nat.authentication.interfaces.AuthProviderBase | None#
property transport: str#
property server_name#

Provide server name for logging

abstractmethod connect_to_server() collections.abc.AsyncGenerator[mcp.ClientSession, None]#
Async:

Establish a session with an MCP server within an async context

async _reconnect()#

Attempt to reconnect by tearing down and re-establishing the session.

async _with_reconnect(coro)#

Execute an awaited operation, reconnecting once on errors. Does not reconnect if the error occurs during an active authentication flow.

async _has_cached_auth_token() bool#

Check if we have a cached, non-expired authentication token.

Returns:

bool: True if we have a valid cached token, False if authentication may be needed

async _get_tool_call_timeout() datetime.timedelta#

Determine the appropriate timeout for a tool call based on authentication state.

Returns:

timedelta: auth_flow_timeout if authentication may be needed, tool_call_timeout otherwise

async get_tools() dict[str, MCPToolClient]#

Retrieve a dictionary of all tools served by the MCP server. Uses unauthenticated session for discovery.

async get_tool(tool_name: str) MCPToolClient#

Get an MCP Tool by name.

Args:

tool_name (str): Name of the tool to load.

Returns:

MCPToolClient for the configured tool.

Raises:

MCPToolNotFoundError: If no tool is available with that name.

set_user_auth_callback(
auth_callback: collections.abc.Callable[[nat.authentication.interfaces.AuthFlowType], nat.authentication.interfaces.AuthenticatedContext],
)#

Set the user authentication callback.

async call_tool(tool_name: str, tool_args: dict | None)#
class MCPSSEClient(
url: str,
tool_call_timeout: datetime.timedelta = timedelta(seconds=60),
auth_flow_timeout: datetime.timedelta = timedelta(seconds=300),
reconnect_enabled: bool = True,
reconnect_max_attempts: int = 2,
reconnect_initial_backoff: float = 0.5,
reconnect_max_backoff: float = 50.0,
)#

Bases: MCPBaseClient

Client for creating a session and connecting to an MCP server using SSE

Args:

url (str): The url of the MCP server

_url#
property url: str#
property server_name#

Provide server name for logging

async connect_to_server()#

Establish a session with an MCP SSE server within an async context

class MCPStdioClient(
command: str,
args: list[str] | None = None,
env: dict[str, str] | None = None,
tool_call_timeout: datetime.timedelta = timedelta(seconds=60),
auth_flow_timeout: datetime.timedelta = timedelta(seconds=300),
reconnect_enabled: bool = True,
reconnect_max_attempts: int = 2,
reconnect_initial_backoff: float = 0.5,
reconnect_max_backoff: float = 50.0,
)#

Bases: MCPBaseClient

Client for creating a session and connecting to an MCP server using stdio. This is a local transport that spawns the MCP server process and communicates with it over stdin/stdout.

Args:

command (str): The command to run args (list[str] | None): Additional arguments for the command env (dict[str, str] | None): Environment variables to set for the process

_command#
_args = None#
_env = None#
property command: str#
property server_name#

Provide server name for logging

property args: list[str] | None#
property env: dict[str, str] | None#
async connect_to_server()#

Establish a session with an MCP server via stdio within an async context

class MCPStreamableHTTPClient(
url: str,
auth_provider: nat.authentication.interfaces.AuthProviderBase | None = None,
user_id: str | None = None,
tool_call_timeout: datetime.timedelta = timedelta(seconds=60),
auth_flow_timeout: datetime.timedelta = timedelta(seconds=300),
reconnect_enabled: bool = True,
reconnect_max_attempts: int = 2,
reconnect_initial_backoff: float = 0.5,
reconnect_max_backoff: float = 50.0,
)#

Bases: MCPBaseClient

Client for creating a session and connecting to an MCP server using streamable-http

Args:

url (str): The url of the MCP server auth_provider (AuthProviderBase | None): Optional authentication provider for Bearer token injection

_url#
property url: str#
property server_name#

Provide server name for logging

async connect_to_server()#

Establish a session with an MCP server via streamable-http within an async context

class MCPToolClient(
session: mcp.ClientSession,
parent_client: MCPBaseClient,
tool_name: str,
tool_description: str | None,
tool_input_schema: dict | None = None,
)#

Client wrapper used to call an MCP tool. This assumes that the MCP transport session has already been setup.

Args:

session (ClientSession): The MCP client session tool_name (str): The name of the tool to wrap tool_description (str): The description of the tool provided by the MCP server. tool_input_schema (dict): The input schema for the tool. parent_client (MCPBaseClient): The parent MCP client for auth management.

_session#
_tool_name#
_tool_description#
_input_schema = None#
_parent_client#
property name#

Returns the name of the tool.

property description#

Returns the tool’s description. If none was provided. Provides a simple description using the tool’s name

property input_schema#

Returns the tool’s input_schema.

set_description(description: str)#

Manually define the tool’s description using the provided string.

async acall(tool_args: dict) str#

Call the MCP tool with the provided arguments. Session context is now handled at the client level, eliminating the need for metadata injection.

Args:

tool_args (dict[str, Any]): A dictionary of key value pairs to serve as inputs for the MCP tool.