aistore.sdk.etl.webserver.base_etl_server
aistore.sdk.etl.webserver.base_etl_server
Module Contents
Classes
Functions
Data
SYNC_DIRECT_PUT_TRANSIENT_ERRORS
API
Wraps an iterator of bytes chunks to count total bytes yielded.
Abstract base class for all ETL servers.
Provides:
host_target: the AIS target URL from environment variableAIS_TARGET_URL.logger: a class-specific logger with INFO level (can be adjusted).
Build a requests.Session by delegating SSL/cert config to SessionManager.
Send a PUT request using the pre-configured session.
Close a BinaryIO reader if it has a close method.
Optional override to specify MIME type of transformed response.
Returns: str
MIME type (e.g., “application/json”, “text/plain”).
Handle the response from a direct PUT request.
Parameters:
The HTTP response from the direct PUT.
The original data bytes (used to compute length for the
200-OK-empty-content case). Can be b"" for streaming.
Explicit byte count override. When >= 0, used instead
of len(data). Pass this from a CountingIterator for
streaming pipeline PUTs where data is empty.
Yield from output_iter and close reader when done.
Build response headers for a direct-put result.
Normalize an FQN to a safe absolute path.
Start the ETL server (blocking call). Typically binds and listens on a port.
Transform the data received from a request (buffered mode).
Override this OR transform_stream in your subclass. If both are
overridden, transform takes priority (backward compatibility).
Parameters:
Object bytes by default. When
ETL_DIRECT_FQN=true, the first pipeline stage receives a str
filepath instead; intermediate stages always receive bytes.
See Etl.init_class(direct_file_access=...) for full details.
The object path (e.g. "bucket/object-name").
Optional per-request arguments.
Returns: bytes
Transformed data to return to the caller.
Transform data in streaming mode — constant memory usage.
Override this OR transform in your subclass. If both are
overridden, transform takes priority (backward compatibility).
The method receives a file-like input (reader) and yields output
chunks as they become available. This avoids buffering the entire
object in memory.
Parameters:
File-like object for reading input data.
The object path (e.g. "bucket/object-name").
Optional per-request arguments.
Determine whether the streaming source is replayable and the effective retry budget.
Sources backed by a local FQN file or a GET stream can be reopened on retry. No-FQN PUT bodies are one-shot (consumed from the request socket) and cannot be replayed locally, so the retry budget is forced to zero.
Parameters:
Local FQN of the source object; empty for streaming PUT.
True for hpull GET, False for hpush PUT.
Configured retry count.
Returns: bool
(replayable, effective_retries) — replayable is True when
Handle a caught SYNC_DIRECT_PUT_TRANSIENT_ERRORS exception.
Returns a (STATUS_BAD_GATEWAY, error_bytes, 0) tuple for permanent
ConnectionRefused errors. Re-raises all other transient errors as
ETLDirectPutTransientError so the caller’s retry loop can act on them.
Parameters:
The direct-put URL that was being contacted.
The caught exception (one of SYNC_DIRECT_PUT_TRANSIENT_ERRORS).
Logger used to emit the permanent-error message.
Returns: Tuple[int, bytes, int]
(STATUS_BAD_GATEWAY, encoded_error_message, 0) for permanent errors.
Raises:
ETLDirectPutTransientError: For all other transient errors.
Return True if exc represents a permanent ConnectionRefused error.
Returns False for bare ConnectionError(“lost”) and other non-refused connection errors.
Note: urllib3 v1.x (Python 3.9) raises NewConnectionError without explicit from
chaining, so __cause__ is None and the ConnectionRefusedError is only reachable
via __context__. Both are checked for cross-version compatibility.