aiq.registry_handlers.pypi.pypi_handler#

Attributes#

Classes#

PypiRegistryHandler

A registry handler for interactions with a remote PyPI registry.

Module Contents#

logger#
class PypiRegistryHandler(
endpoint: str,
token: str | None = None,
publish_route: str = '',
pull_route: str = '',
search_route: str = '',
)#

Bases: aiq.registry_handlers.registry_handler_base.AbstractRegistryHandler

A registry handler for interactions with a remote PyPI registry.

Built interfacing with this private PyPI server: pypiserver/pypiserver

_endpoint#
_token = None#
_publish_route = ''#
_pull_route = ''#
_search_route = ''#
async publish(
artifact: aiq.registry_handlers.schemas.publish.AIQArtifact,
) collections.abc.AsyncGenerator[aiq.registry_handlers.schemas.publish.PublishResponse]#

Publishes an AgentIQ artifact to a PyPI remote registry.

Args:

artifact (AIQArtifact): An artifact that contain AgentIQ plugin wheel and it’s corrosponding discovery metadata.

Yields:

Iterator[AsyncGenerator[PublishResponse, None]]: A response message that includes a completion status message.

_upload_to_pypi(wheel_path: str) None#
async pull(
packages: aiq.registry_handlers.schemas.pull.PullRequestPackages,
) collections.abc.AsyncGenerator[aiq.registry_handlers.schemas.pull.PullResponse]#

Download and install AgentIQ artifacts from a remote PyPI remote registry.

Args:

packages (PullRequestPackages): Parameters used to pull the AgentIQ artifact.

Yields:
Iterator[AsyncGenerator[PullResponse, None]]: A response message that includes a the pulled packages and a

completion status message.

async search(
query: aiq.registry_handlers.schemas.search.SearchQuery,
) collections.abc.AsyncGenerator[aiq.registry_handlers.schemas.search.SearchResponse]#

Searches a remote PyPI registry for relevant AgentIQ components.

Args:

query (SearchQuery): Parameters of the search to be performed.

Yields:
Iterator[AsyncGenerator[SearchResponse]]: A response message that includes search

parameters and a completion status message.

async remove(
packages: aiq.registry_handlers.schemas.package.PackageNameVersionList,
) collections.abc.AsyncGenerator[aiq.registry_handlers.schemas.search.SearchResponse]#

Removes packages from a remote registry.

Args:

packages (PackageNameVersionList): The list of packages to remove.

Yields:
Iterator[AsyncGenerator[RemoveResponse]]: A response message that includes the packages and a

completion status message.