nat.plugins.mcp.client_base#
Attributes#
Classes#
httpx.Auth adapter for authentication providers. |
|
Base client for creating a MCP transport session and connecting to an MCP server |
|
Client for creating a session and connecting to an MCP server using SSE |
|
Client for creating a session and connecting to an MCP server using stdio. |
|
Client for creating a session and connecting to an MCP server using streamable-http |
|
Client wrapper used to call an MCP tool. This assumes that the MCP transport session |
Module Contents#
- logger#
- class AuthAdapter(
- auth_provider: nat.authentication.interfaces.AuthProviderBase,
- user_id: str | None = None,
Bases:
httpx.Authhttpx.Auth adapter for authentication providers. Converts AuthProviderBase to httpx.Auth interface for dynamic token management.
- auth_provider#
- user_id = None#
- _lock#
- is_authenticating = False#
- async async_auth_flow(
- request: httpx.Request,
Add authentication headers to the request using NAT auth provider.
- class MCPBaseClient(
- transport: str = 'streamable-http',
- auth_provider: nat.authentication.interfaces.AuthProviderBase | None = None,
- user_id: str | None = None,
- tool_call_timeout: datetime.timedelta = timedelta(seconds=60),
- auth_flow_timeout: datetime.timedelta = timedelta(seconds=300),
- reconnect_enabled: bool = True,
- reconnect_max_attempts: int = 2,
- reconnect_initial_backoff: float = 0.5,
- reconnect_max_backoff: float = 50.0,
Bases:
abc.ABCBase client for creating a MCP transport session and connecting to an MCP server
- Args:
transport (str): The type of client to use (‘sse’, ‘stdio’, or ‘streamable-http’) auth_provider (AuthProviderBase | None): Optional authentication provider for Bearer token injection tool_call_timeout (timedelta): Timeout for tool calls when authentication is not required auth_flow_timeout (timedelta): Extended timeout for tool calls that may require interactive authentication reconnect_enabled (bool): Whether to automatically reconnect on connection failures reconnect_max_attempts (int): Maximum number of reconnection attempts reconnect_initial_backoff (float): Initial backoff delay in seconds for reconnection attempts reconnect_max_backoff (float): Maximum backoff delay in seconds for reconnection attempts
- _tools = None#
- _transport = ''#
- _exit_stack: contextlib.AsyncExitStack | None = None#
- _connection_established = False#
- _initial_connection = False#
- _auth_provider = None#
- _httpx_auth = None#
- _tool_call_timeout#
- _auth_flow_timeout#
- _reconnect_enabled = True#
- _reconnect_max_attempts = 2#
- _reconnect_initial_backoff = 0.5#
- _reconnect_max_backoff = 50.0#
- _reconnect_lock: asyncio.Lock#
- property auth_provider: nat.authentication.interfaces.AuthProviderBase | None#
- property server_name#
Provide server name for logging
- abstractmethod connect_to_server() collections.abc.AsyncGenerator[mcp.ClientSession, None]#
- Async:
Establish a session with an MCP server within an async context
- async _reconnect()#
Attempt to reconnect by tearing down and re-establishing the session.
- async _with_reconnect(coro)#
Execute an awaited operation, reconnecting once on errors. Does not reconnect if the error occurs during an active authentication flow.
- async _has_cached_auth_token() bool#
Check if we have a cached, non-expired authentication token.
- Returns:
bool: True if we have a valid cached token, False if authentication may be needed
- async _get_tool_call_timeout() datetime.timedelta#
Determine the appropriate timeout for a tool call based on authentication state.
- Returns:
timedelta: auth_flow_timeout if authentication may be needed, tool_call_timeout otherwise
- async get_tools() dict[str, MCPToolClient]#
Retrieve a dictionary of all tools served by the MCP server. Uses unauthenticated session for discovery.
- async get_tool(tool_name: str) MCPToolClient#
Get an MCP Tool by name.
- Args:
tool_name (str): Name of the tool to load.
- Returns:
MCPToolClient for the configured tool.
- Raises:
MCPToolNotFoundError: If no tool is available with that name.
- set_user_auth_callback(
- auth_callback: collections.abc.Callable[[nat.authentication.interfaces.AuthFlowType], nat.authentication.interfaces.AuthenticatedContext],
Set the user authentication callback.
- class MCPSSEClient(
- url: str,
- tool_call_timeout: datetime.timedelta = timedelta(seconds=60),
- auth_flow_timeout: datetime.timedelta = timedelta(seconds=300),
- reconnect_enabled: bool = True,
- reconnect_max_attempts: int = 2,
- reconnect_initial_backoff: float = 0.5,
- reconnect_max_backoff: float = 50.0,
Bases:
MCPBaseClientClient for creating a session and connecting to an MCP server using SSE
- Args:
url (str): The url of the MCP server
- _url#
- property server_name#
Provide server name for logging
- async connect_to_server()#
Establish a session with an MCP SSE server within an async context
- class MCPStdioClient(
- command: str,
- args: list[str] | None = None,
- env: dict[str, str] | None = None,
- tool_call_timeout: datetime.timedelta = timedelta(seconds=60),
- auth_flow_timeout: datetime.timedelta = timedelta(seconds=300),
- reconnect_enabled: bool = True,
- reconnect_max_attempts: int = 2,
- reconnect_initial_backoff: float = 0.5,
- reconnect_max_backoff: float = 50.0,
Bases:
MCPBaseClientClient for creating a session and connecting to an MCP server using stdio. This is a local transport that spawns the MCP server process and communicates with it over stdin/stdout.
- Args:
command (str): The command to run args (list[str] | None): Additional arguments for the command env (dict[str, str] | None): Environment variables to set for the process
- _command#
- _args = None#
- _env = None#
- property server_name#
Provide server name for logging
- async connect_to_server()#
Establish a session with an MCP server via stdio within an async context
- class MCPStreamableHTTPClient(
- url: str,
- auth_provider: nat.authentication.interfaces.AuthProviderBase | None = None,
- user_id: str | None = None,
- tool_call_timeout: datetime.timedelta = timedelta(seconds=60),
- auth_flow_timeout: datetime.timedelta = timedelta(seconds=300),
- reconnect_enabled: bool = True,
- reconnect_max_attempts: int = 2,
- reconnect_initial_backoff: float = 0.5,
- reconnect_max_backoff: float = 50.0,
Bases:
MCPBaseClientClient for creating a session and connecting to an MCP server using streamable-http
- Args:
url (str): The url of the MCP server auth_provider (AuthProviderBase | None): Optional authentication provider for Bearer token injection
- _url#
- property server_name#
Provide server name for logging
- async connect_to_server()#
Establish a session with an MCP server via streamable-http within an async context
- class MCPToolClient(
- session: mcp.ClientSession,
- parent_client: MCPBaseClient,
- tool_name: str,
- tool_description: str | None,
- tool_input_schema: dict | None = None,
Client wrapper used to call an MCP tool. This assumes that the MCP transport session has already been setup.
- Args:
session (ClientSession): The MCP client session tool_name (str): The name of the tool to wrap tool_description (str): The description of the tool provided by the MCP server. tool_input_schema (dict): The input schema for the tool. parent_client (MCPBaseClient): The parent MCP client for auth management.
- _session#
- _tool_name#
- _tool_description#
- _input_schema = None#
- _parent_client#
- property name#
Returns the name of the tool.
- property description#
Returns the tool’s description. If none was provided. Provides a simple description using the tool’s name
- property input_schema#
Returns the tool’s input_schema.