Writing a Python Unified Backend

View as MarkdownOpen in Claude

Writing a Python Unified Backend

New — Dynamo’s unified backend. This guide covers the new unified backend infrastructure in dynamo.common.backend: a shared LLMEngine ABC that vLLM, SGLang, TRT-LLM, and a sample engine already implement, and that any custom Python engine can plug into the same way. For the Rust version of the same contract see Writing a Rust Unified Backend. For the older lower-level Python worker path (register_model + serve_endpoint) — still the right choice for features the unified backend does not yet cover — see Writing Python Workers.

Beta — actively under development. The unified backend surface is beta quality and may change without backwards compatibility between releases. See Feature gaps below for what the unified path covers today versus the existing (non-unified) backend paths.

This guide walks through building a Python backend for an inference engine that plugs into Dynamo’s distributed runtime via dynamo.common.backend. A “unified backend” is a Python entry point that implements the shared LLMEngine ABC and lets the framework own runtime lifecycle (signal handling, model registration, graceful shutdown, cancellation monitoring) — your code just owns inference.

Your backend lives in its own package and does not need to be part of the dynamo repository. It depends on ai-dynamo from PyPI (or the git source) and imports dynamo.common.backend. The steps below assume you’re starting a fresh package in your own repo.

The reference example is the sample engine at sample_engine.py — a complete, runnable implementation under 120 lines. Read it alongside this guide.

Where to look for what:

  • This guide — step-by-step walkthrough for someone starting a new backend from scratch.
  • LLMEngine ABC docstrings — authoritative method-by-method contract.
  • Package README — in-tree reference: GenerateRequest / GenerateChunk field definitions, per-engine cancellation cookbook (vLLM / SGLang / TRT-LLM), full DynamoException table, file index, and the per-engine feature-gap matrix.

Feature gaps

The unified backend is in beta and does not yet cover the full feature set of Dynamo’s existing (non-unified) backend paths. The summary below is the common contract — what every engine on the unified path gets. Per-engine gaps (vLLM, SGLang, TRT-LLM specifics like LoRA, diffusion, attention DP scheduling) live in the package README.

Supported today

  • Aggregated token-in-token-out inference
  • Disaggregated serving (agg / prefill / decode) with bootstrap (SGLang) or internal KV transport (vLLM, TRT-LLM)
  • Model registration with discovery and endpoint types
  • Request cancellation via abort() + context.is_stopped()
  • DynamoException error chain wrapping
  • Graceful shutdown with signal handling
  • Finish reason normalization (handled by the Rust layer)

Not yet on the unified path

FeatureWhat’s missing
Metrics & PrometheusEngine-level metrics, KV utilization gauges, multiprocess registry
KV event publishingPrefix cache events (BlockStored / Removed) to router via ZMQ or NATS
Health check payloadsPer-engine custom health probes (BOS token probe, etc.)
LogprobsSelected + top-k logprob extraction and streaming
Guided decodingJSON schema, regex, grammar, choice constraints
OpenTelemetry tracingTrace headers, request perf metrics, OTEL propagation
Engine routesProfiling, memory release/resume, weight updates (disk/tensor/distributed/IPC)
Data-parallel routingDP rank extraction, DP-aware scheduling
Text-in-text-out modeOpenAI-compatible chat/completion with engine-side tokenization
Custom Jinja chat templates--custom-jinja-template for model-specific prompt formatting
Snapshot/checkpointCRIU-based engine state save/restore
MultimodalImages, video, embeddings, separate encode workers
LoRA adaptersDynamic load/unload, ModelDeploymentCard publishing, per-adapter serialization

If you need one of these features today, keep that workload on the existing per-engine entry point (dynamo.<backend>.main) until the unified path catches up.

What you’re building

A backend is two things:

  1. An engine class that subclasses LLMEngine — owns the model, accepts preprocessed token requests, streams output chunks.
  2. A main.py entry point — a three-line shim that hands the engine class to run() from dynamo.common.backend.run, which drives the lifecycle.

The dynamo.common.backend package handles everything else: signal handling, distributed runtime setup, model registration with discovery, the serving loop, graceful shutdown, cancellation monitoring, and error chain wrapping. (The lifecycle state machine actually lives in Rust; dynamo.common.backend.Worker is a thin Python shim over it.)

from_args → start() → generate() / abort() → drain() → cleanup()
| | | | |
parse argv, start engine, serve requests pre-cleanup release
return return (concurrent) drain resources
engine metadata

Prerequisites

  • Python 3.11 or newer. dynamo uses typing.Required, which is 3.11+.
  • NATS and etcd reachable for end-to-end runs. The dynamo repo’s deploy/docker-compose.yml brings up both in one command if you don’t already have them running.
  • uv or pip for installing dependencies.
  • Familiarity with async Python (asyncio, async generators) and argparse.

Step 1: Create the package

my-backend/
├── pyproject.toml
└── src/
└── my_backend/
├── __init__.py
├── engine.py
└── main.py

Minimal pyproject.toml:

1[build-system]
2requires = ["hatchling"]
3build-backend = "hatchling.build"
4
5[project]
6name = "my-backend"
7version = "0.1.0"
8requires-python = ">=3.11"
9dependencies = [
10 # ai-dynamo bundles dynamo.common.backend. Pin to the release whose
11 # LLMEngine contract you tested against — the surface is still beta
12 # and may change between releases.
13 "ai-dynamo>=1.2.0",
14]
15
16[project.optional-dependencies]
17dev = ["pytest>=8", "pytest-asyncio>=0.23"]
18
19[project.scripts]
20my-backend = "my_backend.main:main"

For a bleeding-edge dependency on the dynamo source tree, install the runtime wheel from a clone:

$git clone https://github.com/ai-dynamo/dynamo.git
$pip install maturin
$cd dynamo/lib/bindings/python && maturin build --release --out /tmp/wheels
$pip install /tmp/wheels/*.whl # ai-dynamo-runtime
$pip install /path/to/dynamo # ai-dynamo (components/ tree)

Building the wheel needs a Rust toolchain plus clang, cmake, protobuf-compiler, and libssl-dev.

Step 2: Subclass LLMEngine

In src/my_backend/engine.py, declare a class that subclasses LLMEngine and owns whatever state your engine needs. Construction must be cheap and side-effect-free — heavy work goes in start().

1# src/my_backend/engine.py
2from __future__ import annotations
3
4import argparse
5import asyncio
6from collections.abc import AsyncGenerator
7
8from dynamo._core import Context
9from dynamo.common.backend import (
10 EngineConfig,
11 GenerateChunk,
12 GenerateRequest,
13 LLMEngine,
14 WorkerConfig,
15)
16
17
18class MyBackend(LLMEngine):
19 def __init__(self, model_name: str, max_tokens: int = 16):
20 self.model_name = model_name
21 self.max_tokens = max_tokens
22 # Heavy state (engine handles, schedulers, KV allocators) is
23 # left None here and initialized in start().
24 self._engine = None

GenerateRequest and GenerateChunk are TypedDicts describing the shared shape — see Step 4 for the fields.

Step 3: Implement from_args

from_args is a classmethod factory that parses CLI args and returns (engine, WorkerConfig). The engine is constructed but not started.

1@classmethod
2async def from_args(
3 cls, argv: list[str] | None = None
4) -> tuple[MyBackend, WorkerConfig]:
5 parser = argparse.ArgumentParser(prog="my-backend")
6 parser.add_argument("--model-name", default="my-model")
7 parser.add_argument("--max-tokens", type=int, default=16)
8 # Runtime / discovery flags — every unified backend needs these.
9 parser.add_argument("--namespace", default="dynamo")
10 parser.add_argument("--component", default="backend")
11 parser.add_argument("--endpoint", default="generate")
12 parser.add_argument("--endpoint-types", default="chat,completions")
13 parser.add_argument("--discovery-backend", default="etcd")
14 parser.add_argument("--request-plane", default="tcp")
15 parser.add_argument("--event-plane", default=None)
16 args = parser.parse_args(argv)
17
18 engine = cls(model_name=args.model_name, max_tokens=args.max_tokens)
19 worker_config = WorkerConfig(
20 namespace=args.namespace,
21 component=args.component,
22 endpoint=args.endpoint,
23 model_name=args.model_name,
24 served_model_name=args.model_name,
25 endpoint_types=args.endpoint_types,
26 discovery_backend=args.discovery_backend,
27 request_plane=args.request_plane,
28 event_plane=args.event_plane,
29 )
30 return engine, worker_config

from_args is async to match the ABC; you can await from it if your CLI parsing reads config from a file or hits an API. Most backends don’t need to.

For backends that already have a DynamoRuntimeConfig-shaped config object (e.g. ones derived from vLLM’s, SGLang’s, or TRT-LLM’s existing config), prefer the WorkerConfig.from_runtime_config(runtime_cfg, model_name=...) helper — it pulls the shared discovery / request-plane / parser fields off the config in one line.

Step 4: Implement LLMEngine methods

The ABC has three required methods (start, generate, cleanup) plus two with default no-op implementations (abort, drain).

start()

Start the engine and return EngineConfig metadata. After this returns, generate() MUST be ready for concurrent calls.

1async def start(self, worker_id: int) -> EngineConfig:
2 # ... load weights, build scheduler, warm up CUDA, etc.
3 # Heavy: may take minutes. Emit logger.info checkpoints so
4 # operators see progress (Worker logs around start() but not
5 # inside it).
6 self._engine = await heavy_init(self.model_name)
7
8 return EngineConfig(
9 model=self.model_name,
10 served_model_name=self.model_name,
11 context_length=8192,
12 kv_cache_block_size=16, # None if no block-structured KV
13 total_kv_blocks=1024,
14 max_num_seqs=64,
15 max_num_batched_tokens=8192,
16 )

worker_id is an opaque per-worker identifier — most engines ignore it. Backends needing a stable cluster-wide key (e.g. TRT-LLM’s disagg_machine_id snowflake) should derive from it instead of hashing host/pid or asking operators for a CLI override.

Every EngineConfig field except model is optional. None means “don’t advertise”; KV-aware routing falls back to round-robin when KV fields are unset.

generate()

An async generator that yields GenerateChunk dicts for a single request. Called concurrently for multiple in-flight requests.

Contract (chunk shape is defined by the GenerateChunk TypedDict — see Request / Response Types in the package README for the field reference):

  • Every chunk carries token_ids and index (use 0 for single choice).
  • The final chunk additionally carries finish_reason and completion_usage.
  • The framework’s cancellation monitor calls engine.abort(context) when the client disconnects or cancels; your loop should also poll context.is_stopped() between yields and exit cleanly with a finish_reason="cancelled" chunk.
1async def generate(
2 self, request: GenerateRequest, context: Context
3) -> AsyncGenerator[GenerateChunk, None]:
4 prompt_tokens = list(request.get("token_ids", []))
5 prompt_len = len(prompt_tokens)
6
7 stop_conditions = request.get("stop_conditions") or {}
8 max_new = stop_conditions.get("max_tokens") or self.max_tokens
9
10 def _usage(completion_tokens: int) -> dict[str, int]:
11 return {
12 "prompt_tokens": prompt_len,
13 "completion_tokens": completion_tokens,
14 "total_tokens": prompt_len + completion_tokens,
15 }
16
17 for i in range(max_new):
18 if context.is_stopped():
19 yield {
20 "token_ids": [],
21 "index": 0,
22 "finish_reason": "cancelled",
23 "completion_usage": _usage(i),
24 }
25 return
26
27 token_id = await self._next_token(prompt_tokens)
28
29 chunk: GenerateChunk = {"token_ids": [token_id], "index": 0}
30 if i == max_new - 1:
31 chunk["finish_reason"] = "length"
32 chunk["completion_usage"] = _usage(max_new)
33 yield chunk

Finish reason normalization ("abort""cancelled", etc.) is handled by the Rust layer — emit whatever your engine uses natively.

abort(context) — optional

Called by the framework only when the client disconnects or the request is cancelled. NOT called on silent stream drops. Override to release engine-side resources (KV slots, scheduler entries, remote schedulers):

1async def abort(self, context: Context) -> None:
2 request_id = context.id()
3 await self._engine.cancel(request_id)

For cleanup that must run on every drop path — including silent drops — use a try/finally or a context manager inside generate, not abort. The sample engine doesn’t override abort because it has no engine-side state to release; the default is a no-op.

drain() — optional

Runs once before shutdown, after the discovery unregister + grace-period sleep, while NATS/etcd are still alive. Use it for backend-side draining that must complete before transport teardown (e.g. in-flight NIXL KV transfers on prefill workers). Default is no-op.

cleanup()

Two real requirements, both pinned by the Rust-side conformance kit:

  • Null-safe against partial start() failure. If start() raises partway through, fields you allocate incrementally may still be None. cleanup() must guard each resource (if self._engine is not None: …) so the post-failure call doesn’t crash on half-initialized state.
  • Idempotent. A second call after a successful first must return cleanly without re-entering teardown.

The Rust Worker drives both: it calls cleanup() after start() returns Ok on shutdown, and the conformance kit (run_conformance) additionally calls cleanup() on a never-started engine and twice in a row, failing your tests with CleanupWithoutStartFailed / SecondCleanupFailed if either invariant breaks. The guarded single-shot pattern below covers both:

1async def cleanup(self) -> None:
2 if self._engine is not None:
3 await self._engine.shutdown()
4 self._engine = None

Step 5: Write main.py

Three lines.

1# src/my_backend/main.py
2from dynamo.common.backend.run import run
3from .engine import MyBackend
4
5
6def main() -> None:
7 run(MyBackend)
8
9
10if __name__ == "__main__":
11 main()

run installs signal handlers, builds the distributed runtime, calls engine.start(worker_id) with a runtime-allocated identifier, registers the model with discovery, serves the endpoint, and runs the graceful-shutdown orchestrator on SIGTERM/SIGINT.

Pair this with the [project.scripts] entry from Step 1’s pyproject.toml so my-backend ... works as a console command.

Step 6: Errors and logging

Errors: the framework wraps non-DynamoException errors raised from generate() (or lifecycle methods) as Unknown. For typed error reporting, raise a DynamoException subclass directly from dynamo.llm.exceptions — it propagates unchanged through the Rust bridge:

1from dynamo.llm.exceptions import InvalidArgument
2
3async def generate(self, request, context):
4 if not request.get("token_ids"):
5 raise InvalidArgument("empty prompt")
6 ...

The package README has the full table of exception types and which lifecycle phase raises which one. Engine-init failures should raise EngineShutdown from start(). Cleanup shouldn’t normally raise — log and swallow if a subsystem fails.

Logging: keep levels consistent across unified backends so operators see the same surface regardless of which engine they’re running:

  • logger.info — lifecycle milestones (engine init complete, serving started, engine shutdown).
  • logger.debug — per-request events (request abort, cancellation).
  • logger.warning — recoverable problems (empty outputs, unexpected finish reasons).
  • logger.error — unrecoverable failures only.

The framework also configures dynamo.runtime.logging for you; you just call logger = logging.getLogger(__name__) at the top of your module and use it.

Step 7: Test your engine

Install the dev extras (pytest, pytest-asyncio) declared in Step 1:

$pip install -e ".[dev]"

The sample engine has a unit-test suite that you can copy as a starting point. The shape of a useful test:

1import pytest
2
3from my_backend import MyBackend
4
5
6class _StubContext:
7 def __init__(self, stopped: bool = False) -> None:
8 self._stopped = stopped
9
10 def is_stopped(self) -> bool:
11 return self._stopped
12
13 def stop(self) -> None:
14 self._stopped = True
15
16
17@pytest.mark.asyncio
18async def test_generate_emits_terminal_chunk():
19 engine = MyBackend(model_name="m", max_tokens=3)
20 await engine.start(worker_id=0)
21 try:
22 chunks = [
23 chunk
24 async for chunk in engine.generate(
25 {"token_ids": [1, 2, 3]}, _StubContext()
26 )
27 ]
28 assert chunks[-1]["finish_reason"] in ("stop", "length")
29 assert chunks[-1]["completion_usage"]["completion_tokens"] == 3
30 finally:
31 await engine.cleanup()
32
33
34@pytest.mark.asyncio
35async def test_generate_observes_cancellation():
36 engine = MyBackend(model_name="m", max_tokens=1000)
37 await engine.start(worker_id=0)
38 try:
39 ctx = _StubContext()
40 collected = []
41 async for chunk in engine.generate({"token_ids": [1]}, ctx):
42 collected.append(chunk)
43 if len(collected) >= 2:
44 ctx.stop()
45 assert collected[-1]["finish_reason"] == "cancelled"
46 finally:
47 await engine.cleanup()

Cover the happy path, cancellation, and any backend-specific edge cases (stop tokens, max-tokens cap, empty prompt). Three to five focused tests is plenty — the framework already pins the lifecycle state machine and cancellation contract with Rust-side tests in lib/backend-common.

Step 8: Run it locally

Three moving parts need to come up: NATS + etcd (discovery and the event/request planes), the Dynamo frontend (HTTP → backend discovery), and your backend.

$pip install -e .
$
$# Ensure NATS + etcd are reachable (NATS_SERVER, ETCD_ENDPOINTS).
$# --model-name must be a valid HuggingFace repo (or local path); the
$# framework fetches the tokenizer + chat template from it on startup.
$# Pick a small public repo for smoke tests.
$my-backend --model-name Qwen/Qwen3-0.6B --namespace dynamo
$
$# In another shell, start the Dynamo frontend:
$python -m dynamo.frontend --http-port 8000

Then send a request:

$curl http://localhost:8000/v1/chat/completions \
> -H 'Content-Type: application/json' \
> -d '{
> "model": "Qwen/Qwen3-0.6B",
> "messages": [{"role": "user", "content": "hello"}],
> "max_tokens": 32
> }'

A successful response has non-empty choices[0].message.content and a finish_reason of stop or length. jq -e '.choices[0].finish_reason' is a good one-liner for a CI smoke test.

If your backend looks silent, set DYN_LOG=info (or DYN_LOG=debug,dynamo=debug for finer scoping) before launching — the framework configures tracing from DYN_LOG.

Reference: the sample engine

sample_engine.py is the canonical minimal reference. Run it as-is:

$python -m dynamo.common.backend.sample_main --model-name test-model

It generates rotating token IDs with no ML dependencies, so it’s a useful stand-in for AIPerf / end-to-end pipeline smoke tests. Lift these patterns:

  • from_args parses CLI args and returns (engine, WorkerConfig) with no awaits.
  • start() returns an EngineConfig whose KV fields are illustrative but not load-bearing (no real KV cache).
  • generate() polls context.is_stopped() between yields and emits a cancelled terminal on observation.
  • cleanup() is a no-op because the engine holds no resources.

Checklist

Before shipping:

  • LLMEngine subclassed; from_args returns (engine, WorkerConfig).
  • start() returns EngineConfig with at least a non-empty model.
  • generate() polls context.is_stopped() between yields and emits a "cancelled" terminal on observation.
  • Final chunk has finish_reason and completion_usage.
  • Typed DynamoException subclasses used for error reporting where the category matters.
  • cleanup() releases all engine resources.
  • Logging levels match the standards in Step 6.

See also