nat.plugins.a2a.server.front_end_plugin_worker#
Attributes#
Classes#
Worker that handles A2A server setup and configuration. |
Module Contents#
- logger#
- class A2AFrontEndPluginWorker(config: nat.data_models.config.Config)#
Worker that handles A2A server setup and configuration.
Initialize the A2A worker with configuration.
- Args:
config: The full NAT configuration
- full_config#
- front_end_config: nat.plugins.a2a.server.front_end_config.A2AFrontEndConfig#
- max_concurrency#
- async _get_all_functions(
- workflow: nat.builder.workflow.Workflow,
Get all functions from the workflow.
- Args:
workflow: The NAT workflow
- Returns:
Dict mapping function names to Function objects
- async _generate_security_schemes(
- server_auth_config,
Generate A2A security schemes from OAuth2ResourceServerConfig.
- Args:
server_auth_config: OAuth2ResourceServerConfig
- Returns:
Tuple of (security_schemes dict, security requirements list)
- async _resolve_oauth_endpoints(server_auth_config) tuple[str, str]#
Resolve authorization and token URLs from OAuth2 configuration.
- Args:
server_auth_config: OAuth2ResourceServerConfig
- Returns:
Tuple of (authorization_url, token_url)
- async create_agent_card(
- workflow: nat.builder.workflow.Workflow,
Build AgentCard from configuration and workflow functions.
Skills are auto-generated from the workflow’s functions, similar to how MCP introspects and exposes functions as tools.
- Args:
workflow: The NAT workflow to extract functions from
- Returns:
AgentCard with agent metadata, capabilities, and auto-generated skills
- create_agent_executor(
- workflow: nat.builder.workflow.Workflow,
- builder: nat.builder.workflow_builder.WorkflowBuilder,
Create agent executor adapter for the workflow.
This creates a SessionManager to handle concurrent A2A task requests, similar to how FastAPI handles multiple HTTP requests.
- Args:
workflow: The NAT workflow to expose builder: The workflow builder used to create the workflow
- Returns:
NATWorkflowAgentExecutor that wraps the workflow with a SessionManager
- create_a2a_server(
- agent_card: a2a.types.AgentCard,
- agent_executor: nat.plugins.a2a.server.agent_executor_adapter.NATWorkflowAgentExecutor,
Create A2A server with the agent executor.
- Args:
agent_card: The agent card describing the agent agent_executor: The executor that handles task processing
- Returns:
Configured A2A Starlette application
- Note:
The httpx client is stored in self._httpx_client for lifecycle management. Call cleanup() during server shutdown to properly close the client.