aistore.sdk.etl.etl

View as Markdown

Module Contents

Classes

NameDescription
EtlA class containing ETL-related functions.

Functions

NameDescription
_get_runtimeDetermine the AIStore ETL runtime identifier for the current Python interpreter.
_validate_comm_type-

API

class aistore.sdk.etl.etl.Etl(
client: aistore.sdk.client.Client,
name: str
)

A class containing ETL-related functions.

Disclaimer: The init_code method has been removed as of version v1.14.0+. Please use init_class instead when registering ETL server classes.

_pipeline
List[str] = []
name
str

Name of the ETL

pipeline
List[str]

List of ETL names in the pipeline

aistore.sdk.etl.etl.Etl.__enter__()
aistore.sdk.etl.etl.Etl.__exit__(
_exc_type,
_exc_value,
_traceback
)

Combine multiple ETLs into a pipeline. The combined ETL will have the same name as the first ETL, with the rest of the ETL stages added to the pipeline property.

Parameters:

other
Etl

The ETL to combine with.

Returns: Etl

The combined ETL.

aistore.sdk.etl.etl.Etl.delete()

Delete ETL. Deletes pods created by Kubernetes for this ETL and specifications for this ETL in Kubernetes.

Note: Running ETLs cannot be deleted.

aistore.sdk.etl.etl.Etl.init(
image: str,
command: typing.Union[typing.List[str], str] = None,
comm_type: str = DEFAULT_ETL_COMM,
init_timeout: str = DEFAULT_ETL_TIMEOUT,
obj_timeout: str = DEFAULT_ETL_OBJ_TIMEOUT,
direct_put: bool = False,
direct_file_access: bool = False,
kwargs = {}
) -> str

Initializes an ETL based on a container image and optional command/env vars.

Parameters:

image
str

Docker image for the ETL.

command
Union[List[str], str]Defaults to None

Command to run in the container.

comm_type
strDefaults to DEFAULT_ETL_COMM

Communication type (hpull, hpush, ws).

init_timeout
strDefaults to DEFAULT_ETL_TIMEOUT

ETL job timeout (e.g., “5m”).

obj_timeout
strDefaults to DEFAULT_ETL_OBJ_TIMEOUT

Per-object transform timeout (e.g., ”45s”).

direct_put
boolDefaults to False

Enable direct-put optimization.

direct_file_access
boolDefaults to False

Pass the local file path to transform() as str instead of loading bytes. Requires hpush and a co-located pod.

**kwargs
Defaults to {}

Additional key-value pairs → env vars in the ETL container.

Returns: str

Job ID for this ETL.

aistore.sdk.etl.etl.Etl.init_class(
dependencies: typing.List[str] = None,
os_packages: typing.List[str] = None,
comm_type: str = DEFAULT_ETL_COMM,
init_timeout: str = DEFAULT_ETL_TIMEOUT,
obj_timeout: str = DEFAULT_ETL_OBJ_TIMEOUT,
direct_put: bool = True,
direct_file_access: bool = False,
kwargs = {}
)

Initialize an ETLServer subclass in AIS.

init_class realizes a special case of ETL initialization that allows to register custom Python class on the server side. This class must extend ETLServer and implement the transform method. The class will be serialized and passed to the ETL runtime as an environment variable. The runtime will deserialize the class and use it to handle incoming requests.

This method is a decorator that can be used to register an ETL server class.

Parameters:

dependencies
List[str]Defaults to None

A list of extra PyPI package names to install inside the ETL pod before running your server. Defaults to no extra packages.

os_packages
List[str]Defaults to None

Names of Linux packages to install inside the ETL container before the server starts. These must be available as Debian-based system packages installable via the apt package manager. (e.g. libsndfile-dev, ffmpeg). Defaults to no extra system packages.

comm_type
strDefaults to DEFAULT_ETL_COMM

How AIS should talk to your ETL pod. Set to "hpush://" or "hpull://" (and is forwarded into the init(...) call). Defaults to "hpush://".

init_timeout
strDefaults to DEFAULT_ETL_TIMEOUT

How long AIS waits for all ETL pods to become ready (e.g. "5m" for five minutes). Defaults to "5m".

obj_timeout
strDefaults to DEFAULT_ETL_OBJ_TIMEOUT

How long each individual object-transform call can run (e.g. "45s" for 45 seconds). Defaults to "45s".

direct_put
boolDefaults to True

When doing a bucket-to-bucket transform, set to True to enable “direct put” optimization. Defaults to True.

direct_file_access
boolDefaults to False

When True, sets ETL_DIRECT_FQN=true in the pod so that transform() receives the object’s local filesystem path as str instead of loading the file into memory. Defaults to False.

Pipeline caveat: FQN is only available for the first pipeline stage. Intermediate stages always receive bytes (the previous stage’s output) — there is no on-disk file for half-transformed data. If the ETL may run in both positions, transform() must handle both types::

def transform(self, data, path, etl_args): if isinstance(data, str):

first stage: filepath

… else:

intermediate stage: bytes

… return …

**kwargs
Defaults to {}

Any other keyword arguments become environment-variables inside the ETL pod. To configure concurrency, set env-var NUM_WORKERS to specify the number of worker processes (default: 4).

aistore.sdk.etl.etl.Etl.init_spec(
template: str,
communication_type: str = DEFAULT_ETL_COMM,
init_timeout: str = DEFAULT_ETL_TIMEOUT,
obj_timeout: str = DEFAULT_ETL_OBJ_TIMEOUT
) -> str

Initializes ETL based on Kubernetes pod spec template.

Parameters:

template
str

Kubernetes pod spec template Existing templates can be found at sdk.etl_templates For more information visit: https://github.com/NVIDIA/ais-etl/tree/main/transformers

communication_type
strDefaults to DEFAULT_ETL_COMM

Communication type of the ETL (options: hpull, hpush)

init_timeout
strDefaults to DEFAULT_ETL_TIMEOUT

Timeout of the ETL job (e.g., “5m” for 5 minutes). Default is “5m”.

obj_timeout
strDefaults to DEFAULT_ETL_OBJ_TIMEOUT

Timeout for transforming a single object (e.g., ”45s”). Default is ”45s”.

Returns: str

Job ID string associated with this ETL

aistore.sdk.etl.etl.Etl.logs(
target_id: str = ''
) -> typing.List[aistore.sdk.types.ETLNodeLogs]

Get logs from ETL pods.

Parameters:

target_id
strDefaults to ''

Target node ID to get logs from a specific pod. If empty, returns logs from all pods.

Returns: List[ETLNodeLogs]

List[ETLNodeLogs]: Logs from each target node. The logs field contains the decoded plaintext log output.

aistore.sdk.etl.etl.Etl.start()

Resumes a stopped ETL with given ETL name.

Note: Deleted ETLs cannot be started.

aistore.sdk.etl.etl.Etl.stop()

Stops ETL. Stops (but does not delete) all the pods created by Kubernetes for this ETL and terminates any transforms.

aistore.sdk.etl.etl.Etl.validate_etl_name(
name: str
)
staticmethod

Validate the ETL name based on specific criteria.

Parameters:

name
str

The name of the ETL to validate.

Raises:

  • ValueError: If the name is too short (less than 6 characters), too long (more than 32 characters), or contains invalid characters (anything other than lowercase letters, digits, or hyphens).
aistore.sdk.etl.etl.Etl.view(
job_id: str = ''
) -> aistore.sdk.types.ETLDetails

View ETL details

Parameters:

job_id
strDefaults to ''

Offline Transform job ID of the ETL to view details for. Default to view inline transform details.

Returns: ETLDetails

details of the ETL

aistore.sdk.etl.etl._get_runtime() -> str

Determine the AIStore ETL runtime identifier for the current Python interpreter.

Returns: str

A string like “3.10” when running under Python 3.10.

Raises:

  • ValueError: If the current Python version isn’t in ETL_SUPPORTED_PYTHON_VERSIONS.
aistore.sdk.etl.etl._validate_comm_type(
given: str,
valid: typing.List[str]
)