nat.plugins.a2a.client.client_base#
Attributes#
Classes#
Minimal A2A client for connecting to an A2A agent. |
Module Contents#
- logger#
- class A2ABaseClient(
- base_url: str,
- agent_card_path: str = '/.well-known/agent-card.json',
- task_timeout: datetime.timedelta = timedelta(seconds=300),
- streaming: bool = True,
- auth_provider: nat.authentication.interfaces.AuthProviderBase | None = None,
Minimal A2A client for connecting to an A2A agent.
- Args:
base_url: The base URL of the A2A agent agent_card_path: Path to agent card (default: /.well-known/agent-card.json) task_timeout: Timeout for task operations (default: 300 seconds) streaming: Enable streaming responses (default: True) auth_provider: Optional NAT authentication provider for securing requests
- _base_url#
- _agent_card_path = '/.well-known/agent-card.json'#
- _task_timeout#
- _streaming = True#
- _auth_provider = None#
- async _resolve_agent_card()#
Fetch the agent card from the A2A agent.
- async send_message( ) collections.abc.AsyncGenerator[a2a.client.ClientEvent | a2a.types.Message, None]#
Send a message to the agent and stream response events.
This is the low-level A2A protocol method that yields events as they arrive. For simpler usage, prefer the high-level agent function registered by this client.
- Args:
message_text: The message text to send task_id: Optional task ID to continue an existing conversation context_id: Optional context ID for the conversation
- Yields:
- ClientEvent | Message: The agent’s response events as they arrive.
ClientEvent is a tuple of (Task, UpdateEvent | None) Message is a direct message response
- async get_task( ) a2a.types.Task#
Get the status and details of a specific task.
This is an A2A protocol operation for retrieving task information.
- Args:
task_id: The unique identifier of the task history_length: Optional limit on the number of history messages to retrieve
- Returns:
Task: The task object with current status and history
- async cancel_task(task_id: str) a2a.types.Task#
Cancel a running task.
This is an A2A protocol operation for canceling tasks.
- Args:
task_id: The unique identifier of the task to cancel
- Returns:
Task: The task object with updated status
- async send_message_streaming( ) collections.abc.AsyncGenerator[a2a.client.ClientEvent | a2a.types.Message, None]#
Send a message to the agent and stream response events (alias for send_message).
This method provides an explicit streaming interface that mirrors the A2A SDK pattern. It is functionally identical to send_message(), which already streams events.
- Args:
message_text: The message text to send task_id: Optional task ID to continue an existing conversation context_id: Optional context ID for the conversation
- Yields:
ClientEvent | Message: The agent’s response events as they arrive.
- extract_text_from_parts(parts: list) list[str]#
Extract text content from A2A message parts.
- Args:
parts: List of A2A Part objects
- Returns:
List of text strings extracted from the parts
- extract_text_from_task(task) str#
Extract text response from an A2A Task object.
This method understands the A2A protocol structure and extracts the final text response from a completed task, prioritizing artifacts over history.
- Args:
task: A2A Task object
- Returns:
Extracted text response or status message
- Priority order:
Check task status (return error/progress if not completed)
Extract from task.artifacts (structured output)
Fallback to last agent message in task.history