aistore.sdk.etl.webserver.base_etl_server
aistore.sdk.etl.webserver.base_etl_server
Module Contents
Classes
Functions
Data
SYNC_DIRECT_PUT_TRANSIENT_ERRORS
API
Wraps an iterator of bytes chunks to count total bytes yielded.
Abstract base class for all ETL servers.
Provides:
host_target: the AIS target URL from environment variableAIS_TARGET_URL.logger: a class-specific logger with INFO level (can be adjusted).
Build a requests.Session by delegating SSL/cert config to SessionManager.
Send a PUT request using the pre-configured session.
Close a BinaryIO reader if it has a close method.
Optional override to specify MIME type of transformed response.
Returns: str
MIME type (e.g., “application/json”, “text/plain”).
Handle the response from a direct PUT request.
Parameters:
The HTTP response from the direct PUT.
The original data bytes (used to compute length for the
200-OK-empty-content case). Can be b"" for streaming.
Explicit byte count override. When >= 0, used instead
of len(data). Pass this from a CountingIterator for
streaming pipeline PUTs where data is empty.
Yield from output_iter and close reader when done.
Build response headers for a direct-put result.
Normalize an FQN to a safe absolute path.
Start the ETL server (blocking call). Typically binds and listens on a port.
Transform the data received from a request (buffered mode).
Override this OR transform_stream in your subclass. If both are
overridden, transform takes priority (backward compatibility).
Parameters:
Object bytes by default. When
ETL_DIRECT_FQN=true, the first pipeline stage receives a str
filepath instead; intermediate stages always receive bytes.
See Etl.init_class(direct_file_access=...) for full details.
The object path (e.g. "bucket/object-name").
Optional per-request arguments.
Returns: bytes
Transformed data to return to the caller.
Transform data in streaming mode — constant memory usage.
Override this OR transform in your subclass. If both are
overridden, transform takes priority (backward compatibility).
The method receives a file-like input (reader) and yields output
chunks as they become available. This avoids buffering the entire
object in memory.
Parameters:
File-like object for reading input data.
The object path (e.g. "bucket/object-name").
Optional per-request arguments.
Handle a caught SYNC_DIRECT_PUT_TRANSIENT_ERRORS exception.
Returns a (STATUS_BAD_GATEWAY, error_bytes, 0) tuple for permanent
ConnectionRefused errors. Re-raises all other transient errors as
ETLDirectPutTransientError so the caller’s retry loop can act on them.
Parameters:
The direct-put URL that was being contacted.
The caught exception (one of SYNC_DIRECT_PUT_TRANSIENT_ERRORS).
Logger used to emit the permanent-error message.
Returns: Tuple[int, bytes, int]
(STATUS_BAD_GATEWAY, encoded_error_message, 0) for permanent errors.
Raises:
ETLDirectPutTransientError: For all other transient errors.
Return True if exc represents a permanent ConnectionRefused error.
Returns False for bare ConnectionError(“lost”) and other non-refused connection errors.
Note: urllib3 v1.x (Python 3.9) raises NewConnectionError without explicit from
chaining, so __cause__ is None and the ConnectionRefusedError is only reachable
via __context__. Both are checked for cross-version compatibility.