aistore.sdk.etl.webserver.fastapi_streaming
aistore.sdk.etl.webserver.fastapi_streaming
FastAPI streaming helpers for ETL servers.
Bridges Starlette’s async request stream into a sync BinaryIO and
implements an ASGI streaming response that defers http.response.start
until the first body chunk is produced. Both classes are FastAPI-internal
adapters used by fastapi_server.FastAPIServer; they live here to keep
that module focused on routing and request handling.
Module Contents
Classes
Data
API
Bases: StreamingResponse
Streaming response that defers http.response.start until the body
iterator yields its first chunk, so iterators consuming
request.stream() can still pull request body chunks via receive().
ASGI request/response phasing: an ASGI server delivers request body
chunks via receive() only while the application is in the
request-reading phase. Sending http.response.start transitions the
application to the response-sending phase; subsequent receive() calls
return http.disconnect (uvicorn) or block indefinitely
(TestClient). In short: the request body tap closes the moment the
response is announced.
Starlette’s built-in StreamingResponse sends response.start before
iterating the body, so any body iterator that reads request.stream()
(e.g. via _RequestStreamReader) hits ClientDisconnect on the first
chunk. This class reorders the two send() calls: the first body chunk
is pulled first (still in the request-reading phase, so receive()
succeeds), then response.start is sent, then subsequent chunks.
Empirically, once delivery has begun the server keeps returning body
chunks via receive() for the lifetime of the request on both real
uvicorn and TestClient.
Inherits __init__ from StreamingResponse (same signature, same
body_iterator setup) and only overrides __call__ to swap the
ordering. We override __call__ rather than stream_response because
StreamingResponse.__call__ on ASGI spec_version < 2.4 spawns a
listen_for_disconnect task that competes with our body iterator for
receive() messages — the very conflict we are avoiding.
See starlette discussion #1830 for context. Used only in the inline
streaming path; the direct-put path consumes the body inside the handler
before constructing any response, so it can use a regular Response.
ASGI entry point.
Parameters:
ASGI scope (unused; required by ASGI signature).
ASGI receive callable (unused; the body
iterator may drive receive itself via request.stream()).
ASGI send callable used to emit
http.response.start and http.response.body messages.
Bases: RawIOBase
Make async request.stream() look like a sync file so the existing
transform_stream(reader: BinaryIO, ...) API can consume the request
body without buffering or an async refactor.
transform_stream runs in an anyio worker thread (Starlette dispatches
sync iteration there via iterate_in_threadpool; httpx.AsyncClient
does the same when iterating a sync content iterator). Each read(n)
blocks the worker thread while anyio.from_thread.run awaits the next
chunk on the request’s event loop. Same loop, same threadpool, no extra
deps (anyio is already a transitive dep of FastAPI).
request.stream() cannot be replayed; _direct_put_stream_with_retry
skips local retries on this path so AIS retries the whole PUT instead.
Errors raised by request.stream() (client disconnect, framing) surface
from read() to the transform.
Pull the next request body chunk from the event loop.
Returns: Optional[bytes]
Optional[bytes]: Next chunk from request.stream(), or None on EOF.
Read up to size bytes from the request body stream. Bytes pulled
but not returned (when a chunk straddles size) stay in the
internal buffer for the next read() call.
Parameters:
Maximum number of bytes to read. -1 (or
None) reads to EOF. Defaults to -1.
Returns: bytes
Bytes read from the stream. May be shorter than size on EOF.