aiq.registry_handlers.registry_handler_base#

Classes#

AbstractRegistryHandler

Base class outlining the interfaces for remote AgentIQ registry interactions.

Module Contents#

class AbstractRegistryHandler#

Bases: abc.ABC

Base class outlining the interfaces for remote AgentIQ registry interactions.

_discovery_metadata: dict[aiq.data_models.component.AIQComponentEnum, list[dict | aiq.data_models.discovery_metadata.DiscoveryMetadata]]#
_aiq_artifact: aiq.registry_handlers.schemas.publish.AIQArtifact | None = None#
_whl_bytes: bytes#
_whl_path: str#
_whl_base64: str#
abstractmethod publish(
artifact: aiq.registry_handlers.schemas.publish.AIQArtifact,
) collections.abc.AsyncGenerator[aiq.registry_handlers.schemas.publish.PublishResponse]#
Async:

Publishes an AgentIQ artifact to a remote registry.

Args:

artifact (AIQArtifact): An artifact that contain AgentIQ plugin wheel and it’s corrosponding discovery metadata.

Yields:

Iterator[AsyncGenerator[PublishResponse, None]]: A response message that includes a completion status message.

abstractmethod pull(
packages: aiq.registry_handlers.schemas.pull.PullRequestPackages,
) collections.abc.AsyncGenerator[aiq.registry_handlers.schemas.pull.PullResponse]#
Async:

Download and install AgentIQ artifacts from a remote registry.

Args:

packages (PullRequestPackages): Parameters used to pull the AgentIQ artifact.

Yields:
Iterator[AsyncGenerator[PullResponse]]: A response message that includes a the pulled packages and a

completion status message.

abstractmethod search(
query: aiq.registry_handlers.schemas.search.SearchQuery,
) collections.abc.AsyncGenerator[aiq.registry_handlers.schemas.search.SearchResponse]#
Async:

Searches the local aiq registry for relevant AgentIQ components.

Args:

query (SearchQuery): Parameters of the search to be performed.

Yields:
Iterator[AsyncGenerator[SearchResponse]]: A response message that includes search

parameters and a completion status message.

abstractmethod remove(
packages: aiq.registry_handlers.schemas.package.PackageNameVersionList,
) collections.abc.AsyncGenerator[aiq.registry_handlers.schemas.remove.RemoveResponse]#
Async:

Removes packages from a remote registry.

Args:

packages (PackageNameVersionList): The list of packages to remove.

Yields:
Iterator[AsyncGenerator[RemoveResponse]]: A response message that includes the packages and a

completion status message.

static visualize_search_results(
search_response: aiq.registry_handlers.schemas.search.SearchResponse,
pager: bool = True,
) None#

Visualze search results in a system terminal.

Args:

search_response (SearchResponse): A response message that includes search parameters and a completion status message.

pager (bool, optional): Include an pagable terminal interface for large search results. Defaults to False.

static save_search_results(
search_response: aiq.registry_handlers.schemas.search.SearchResponse,
save_path: str,
) None#

Save search results to a local json file.

Args:

search_response (SearchResponse): A response message that includes search parameters and a completion status message.

save_path (str): The path to save the json search results.