aistore.sdk.etl.webserver.base_etl_server

View as Markdown

Module Contents

Classes

NameDescription
CountingIteratorWraps an iterator of bytes chunks to count total bytes yielded.
ETLServerAbstract base class for all ETL servers.

Functions

NameDescription
_handle_direct_put_transient_errorHandle a caught SYNC_DIRECT_PUT_TRANSIENT_ERRORS exception.
_is_connection_refusedReturn True if exc represents a permanent ConnectionRefused error.

Data

RETRY_BACKOFF_BASE

RETRY_BACKOFF_MAX

SYNC_DIRECT_PUT_TRANSIENT_ERRORS

API

class aistore.sdk.etl.webserver.base_etl_server.CountingIterator(
inner: typing.Iterator[bytes]
)

Wraps an iterator of bytes chunks to count total bytes yielded.

bytes_sent
= 0
aistore.sdk.etl.webserver.base_etl_server.CountingIterator.__iter__() -> typing.Iterator[bytes]
class aistore.sdk.etl.webserver.base_etl_server.ETLServer()
Abstract

Abstract base class for all ETL servers.

Provides:

  • host_target: the AIS target URL from environment variable AIS_TARGET_URL.
  • logger: a class-specific logger with INFO level (can be adjusted).
direct_fqn
direct_put
= os.getenv('DIRECT_PUT', 'false').lower() == 'true'
direct_put_retries
int = int(os.getenv(AIS_DIRECT_PUT_RETRIES, '3'))
host_target
= os.getenv('AIS_TARGET_URL')
logger
= logging.getLogger(self.__class__.__name__)
session
= self._build_session()
token
= os.environ.get(AIS_AUTHN_TOKEN)
use_streaming
= has_stream and not has_transform
aistore.sdk.etl.webserver.base_etl_server.ETLServer._build_session() -> requests.Session

Build a requests.Session by delegating SSL/cert config to SessionManager.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.client_put(
url: str,
data: bytes,
headers: dict,
timeout: int = None
) -> requests.Response

Send a PUT request using the pre-configured session.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.close_reader(
reader
)
staticmethod

Close a BinaryIO reader if it has a close method.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.get_mime_type() -> str

Optional override to specify MIME type of transformed response.

Returns: str

MIME type (e.g., “application/json”, “text/plain”).

aistore.sdk.etl.webserver.base_etl_server.ETLServer.handle_direct_put_response(
resp: requests.Response,
data: bytes,
data_length: int = -1
) -> typing.Tuple[int, bytes, int]

Handle the response from a direct PUT request.

Parameters:

resp
requests.Response

The HTTP response from the direct PUT.

data
bytes

The original data bytes (used to compute length for the 200-OK-empty-content case). Can be b"" for streaming.

data_length
intDefaults to -1

Explicit byte count override. When >= 0, used instead of len(data). Pass this from a CountingIterator for streaming pipeline PUTs where data is empty.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.iter_and_close(
output_iter: typing.Iterator[bytes],
reader
) -> typing.Iterator[bytes]
staticmethod

Yield from output_iter and close reader when done.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.make_direct_put_headers(
direct_put_length: int
) -> dict
staticmethod

Build response headers for a direct-put result.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.sanitize_fqn(
fqn: str
) -> str

Normalize an FQN to a safe absolute path.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.start()
abstract

Start the ETL server (blocking call). Typically binds and listens on a port.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.transform(
data: typing.Union[bytes, str],
path: str,
etl_args: str
) -> bytes

Transform the data received from a request (buffered mode).

Override this OR transform_stream in your subclass. If both are overridden, transform takes priority (backward compatibility).

Parameters:

data
Union[bytes, str]

Object bytes by default. When ETL_DIRECT_FQN=true, the first pipeline stage receives a str filepath instead; intermediate stages always receive bytes. See Etl.init_class(direct_file_access=...) for full details.

path
str

The object path (e.g. "bucket/object-name").

etl_args
str

Optional per-request arguments.

Returns: bytes

Transformed data to return to the caller.

aistore.sdk.etl.webserver.base_etl_server.ETLServer.transform_stream(
reader: typing.BinaryIO,
path: str,
etl_args: str
) -> typing.Iterator[bytes]

Transform data in streaming mode — constant memory usage.

Override this OR transform in your subclass. If both are overridden, transform takes priority (backward compatibility).

The method receives a file-like input (reader) and yields output chunks as they become available. This avoids buffering the entire object in memory.

Parameters:

reader
BinaryIO

File-like object for reading input data.

path
str

The object path (e.g. "bucket/object-name").

etl_args
str

Optional per-request arguments.

aistore.sdk.etl.webserver.base_etl_server._handle_direct_put_transient_error(
direct_put_url: str,
exc: Exception,
logger: logging.Logger
) -> typing.Tuple[int, bytes, int]

Handle a caught SYNC_DIRECT_PUT_TRANSIENT_ERRORS exception.

Returns a (STATUS_BAD_GATEWAY, error_bytes, 0) tuple for permanent ConnectionRefused errors. Re-raises all other transient errors as ETLDirectPutTransientError so the caller’s retry loop can act on them.

Parameters:

direct_put_url
str

The direct-put URL that was being contacted.

exc
Exception

The caught exception (one of SYNC_DIRECT_PUT_TRANSIENT_ERRORS).

logger
logging.Logger

Logger used to emit the permanent-error message.

Returns: Tuple[int, bytes, int]

(STATUS_BAD_GATEWAY, encoded_error_message, 0) for permanent errors.

Raises:

  • ETLDirectPutTransientError: For all other transient errors.
aistore.sdk.etl.webserver.base_etl_server._is_connection_refused(
exc: requests.ConnectionError
) -> bool

Return True if exc represents a permanent ConnectionRefused error.

Returns False for bare ConnectionError(“lost”) and other non-refused connection errors.

Note: urllib3 v1.x (Python 3.9) raises NewConnectionError without explicit from chaining, so __cause__ is None and the ConnectionRefusedError is only reachable via __context__. Both are checked for cross-version compatibility.

aistore.sdk.etl.webserver.base_etl_server.RETRY_BACKOFF_BASE = 2.0
aistore.sdk.etl.webserver.base_etl_server.RETRY_BACKOFF_MAX = 30.0
aistore.sdk.etl.webserver.base_etl_server.SYNC_DIRECT_PUT_TRANSIENT_ERRORS = (requests.ConnectionError, requests.exceptions.ChunkedEncodingError)