aistore.sdk.etl.webserver.fastapi_server

View as Markdown

Module Contents

Classes

NameDescription
FastAPIServerFastAPI server implementation for ETL transformations.

Data

HTTP_LIMITS

_DIRECT_PUT_TRANSIENT_ERRORS

API

class aistore.sdk.etl.webserver.fastapi_server.FastAPIServer(
host: str = '0.0.0.0',
port: int = 8000
)

Bases: ETLServer

FastAPI server implementation for ETL transformations.

active_connections
List[WebSocket] = []
app
= FastAPI()
chunk_size
int
client
Optional[AsyncClient] = None
aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._build_response(
content: bytes,
mime_type: str
) -> fastapi.Response

Construct standardized response with appropriate headers.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._direct_put(
direct_put_url: str,
data: bytes,
remaining_pipeline: str = '',
path: str = ''
) -> typing.Tuple[int, bytes, int]
async

Sends the transformed object directly to the specified AIS node (direct_put_url), eliminating the additional network hop through the original target. Used only in bucket-to-bucket offline transforms.

Returns: status code, transformed data, length of the transformed data (if any) Raises: ETLDirectPutTransientError: on ReadError/ConnectError/RemoteProtocolError so the caller can retry without re-fetching data.

Parameters:

direct_put_url
str

The first URL in the ETL pipeline

data
bytes

The transformed data to send

remaining_pipeline
strDefaults to ''

Comma-separated remaining pipeline stages to pass as header

path
strDefaults to ''

The path of the object.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._direct_put_stream(
direct_put_url: str,
data_iter: typing.Iterator[bytes],
remaining_pipeline: str = '',
path: str = ''
) -> typing.Tuple[int, bytes, int]
async

Stream transformed output directly to the next pipeline stage.

Returns: Tuple[int, bytes, int]

(status_code, body, length) where:

  • status_code: HTTP status of the PUT (200/204 on success, 500 on error).
  • body: response bytes forwarded back to the AIS target (empty on success).
  • length: bytes sent to the destination, from CountingIterator.
aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._direct_put_stream_with_retry(
fqn: str,
path: str,
request: fastapi.Request,
is_get: bool,
etl_args: str,
first_url: str,
remaining: str
) -> typing.Tuple[int, bytes, int]
async

Stream-put with exponential-backoff retry on transient network errors.

Each retry reopens the source and rebuilds the transform generator from scratch. Sources that can be reopened: FQN-backed (re-open the file) or GET (re-issue the upstream stream). Streaming no-FQN PUT is not replayable — request.stream() is one-shot — so retries are skipped for that case and AIS retries the whole PUT instead.

Parameters:

fqn
str

Local FQN of the source object on the target’s filesystem. Empty string when the body comes from request.stream().

path
str

Object path (e.g. "bucket/object-name") forwarded to the next pipeline stage and passed to transform_stream.

request
Request

Incoming Request; its body is the source when fqn is empty and is_get is False.

is_get
bool

True for hpull GET, False for hpush PUT.

etl_args
str

Per-request transformation arguments (may be empty).

first_url
str

First URL in the direct-put pipeline (next stage).

remaining
str

Comma-separated remaining pipeline stages, forwarded to the next stage via the AIS-Node-Url header.

Returns: Tuple[int, bytes, int]

Tuple[int, bytes, int]: (status_code, body, length) — see _direct_put_stream for semantics.

Raises:

  • ETLDirectPutTransientError: if all retry attempts are exhausted.
aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._direct_put_with_retry(
direct_put_url: str,
data: bytes,
remaining_pipeline: str = '',
path: str = ''
) -> typing.Tuple[int, bytes, int]
async

Buffered direct-put with exponential-backoff retry on transient network errors.

Raises: ETLDirectPutTransientError: if all retry attempts are exhausted.

Returns: Tuple[int, bytes, int]

(status_code, body, length) — see _direct_put for semantics.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._get_fqn_content(
path: str
) -> bytes
async

Safely read local file content with path normalization.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._get_network_content(
path: str
) -> bytes
async

Retrieve content from AIS target with async HTTP client.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._get_stream_reader(
fqn,
path,
request,
is_get
) -> typing.BinaryIO
async

Get a BinaryIO reader for the request source data.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._handle_request(
path: str,
request: fastapi.Request,
is_get: bool
)
async

Unified request handler for GET/PUT operations.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._handle_request_buffered(
path: str,
request: fastapi.Request,
is_get: bool
)
async

Buffered request handler — loads entire object, transforms, returns.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._handle_request_streaming(
path: str,
request: fastapi.Request,
is_get: bool
)
async

Streaming request handler — yields output chunks without buffering.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._handle_ws_message(
websocket: fastapi.WebSocket
)
async

Handle a single WebSocket message. Always uses buffered transform().

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._iter_chunks(
data: bytes,
chunk_size: int = MIB
)
asyncstaticmethod

Yield data in chunk_size pieces as an async bytes generator.

Recommended approach for large PUT bodies with httpx.AsyncClient. See https://www.python-httpx.org/async/#streaming-requests.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._open_sync_get_stream(
target_url: str
) -> typing.BinaryIO

Open a streaming GET against the AIS target using the shared sync session.

Blocking — must be called via asyncio.to_thread from an async context. Returns a _ResponseRawReader wrapping the response so close() releases the connection back to the urllib3 keep-alive pool.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer._setup_app()

Configure FastAPI routes and event handlers.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer.shutdown_event()
async

Cleanup resources on server shutdown.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer.start()

Start the server with production-optimized settings.

aistore.sdk.etl.webserver.fastapi_server.FastAPIServer.startup_event()
async

Initialize resources on server startup.

aistore.sdk.etl.webserver.fastapi_server.HTTP_LIMITS = httpx.Limits(max_connections=(int(os.getenv('MAX_CONN', '256'))), max_keepalive_...
aistore.sdk.etl.webserver.fastapi_server._DIRECT_PUT_TRANSIENT_ERRORS = (httpx.ReadError, httpx.WriteError, httpx.ConnectError, httpx.RemoteProtocolErro...