core.inference.apis._llm_base#

Internal building blocks for the Megatron inference high-level API.

This module hosts private helpers shared by MegatronLLM and MegatronAsyncLLM: _EventLoopManager, _CoordinatorRuntime, and _MegatronLLMBase. The public sync/async wrappers live on the subclasses; this base only exposes shared engine state, runtime spawn, validation helpers, and the private _<method>_impl coroutines.

Module Contents#

Classes#

_EventLoopManager

Per-instance background daemon thread + persistent asyncio event loop.

_CoordinatorRuntime

Owns the dynamic-inference coordinator and InferenceClient lifecycle.

_MegatronLLMBase

Private base shared by MegatronLLM and MegatronAsyncLLM.

API#

class core.inference.apis._llm_base._EventLoopManager#

Per-instance background daemon thread + persistent asyncio event loop.

Bridges sync and async user-thread callers to coroutines that run on the background loop via asyncio.run_coroutine_threadsafe.

Initialization

start() None#

Spawn the daemon thread and start the event loop. Idempotent.

property loop: asyncio.AbstractEventLoop#

The background asyncio loop. Raises if start() has not been called.

submit(coro: Coroutine) concurrent.futures.Future#

Schedule coro on the background loop and return its future.

The caller decides how to wait on the returned future (e.g. .result() for blocking sync, asyncio.wrap_future(...) for awaiting from another loop).

run_sync(coro: Coroutine)#

Schedule coro on the background loop and block on its result.

Must not be called from a coroutine running on self._loop itself – that would deadlock, since the only loop that could dispatch coro would be the one already blocked waiting for the caller. Calling from a different loop (e.g., the user’s main-thread asyncio loop) is allowed: coro runs on the background loop while the caller’s loop is stalled until .result() returns.

async run_async(coro: Coroutine)#

Schedule coro on the background loop and await it from any loop.

stop() None#

Stop the event loop and join the background thread. Idempotent.

class core.inference.apis._llm_base._CoordinatorRuntime(
engine: megatron.core.inference.engines.dynamic_engine.DynamicInferenceEngine,
*,
is_primary: bool,
coordinator_host: Optional[str],
coordinator_port: Optional[int],
)#

Owns the dynamic-inference coordinator and InferenceClient lifecycle.

Async-native: :meth:setup and :meth:teardown are coroutines meant to run on a background loop owned by :class:_EventLoopManager. The primary rank additionally holds an :class:InferenceClient used by the high-level API to submit requests and send control signals.

Initialization

async setup(*, loop: asyncio.AbstractEventLoop) None#

Bring the coordinator and (on primary) the InferenceClient up.

Calls engine.start_listening_to_data_parallel_coordinator(loop=loop) on every rank. Only host/port kwargs that the caller actually supplied are forwarded so the engine can auto-bind when both are None.

async teardown() None#

Idempotent best-effort shutdown of the coordinator + client.

Safe to call from partial-setup state (e.g., when :meth:setup raised after the coordinator subprocess spawned but before the client opened). Worker ranks are always no-op; their engine_loop_task is awaited by

Meth:

_MegatronLLMBase._shutdown_impl after the primary has issued the STOP signal.

property client: InferenceClient | None#

The :class:InferenceClient on the primary rank; None on workers.

property coord_addr: Optional[str]#

Address returned by start_listening_to_data_parallel_coordinator.

class core.inference.apis._llm_base._MegatronLLMBase(
*,
model,
tokenizer,
inference_config: Optional[megatron.core.inference.config.InferenceConfig] = None,
use_coordinator: bool = False,
coordinator_host: Optional[str] = None,
coordinator_port: Optional[int] = None,
)#

Private base shared by MegatronLLM and MegatronAsyncLLM.

This base intentionally exposes no public generate / lifecycle methods – those live on the subclasses, which call into the private _<method>_impl coroutines defined here. The base owns:

  • the engine pipeline (engine, context, controller),

  • the per-instance background runtime (_loop_manager, _coord_runtime) when use_coordinator=True,

  • validation helpers (_assert_primary, _assert_coordinator) and the input shape helper (_normalize_prompts).

Two execution modes are supported:

  • Direct mode (use_coordinator=False): every rank is treated as primary and generate runs the engine synchronously (offloaded to a thread when called from an event loop). Lifecycle methods are invalid and raise :class:RuntimeError via _assert_coordinator.

  • Coordinator mode (use_coordinator=True): a background event loop hosts the engine pipeline and an :class:InferenceClient (on global rank 0). Only the primary rank may submit requests via generate.

model must be in eval mode before construction; this class does not modify the model state.

Initialization

property is_primary_rank: bool#

Whether generate may be called on this rank.

property engine: megatron.core.inference.engines.dynamic_engine.DynamicInferenceEngine#

The underlying :class:DynamicInferenceEngine.

property context: megatron.core.inference.contexts.dynamic_context.DynamicInferenceContext#

The underlying :class:DynamicInferenceContext.

property controller: megatron.core.inference.text_generation_controllers.text_generation_controller.TextGenerationController#

The underlying :class:TextGenerationController.

_assert_primary() None#
_assert_coordinator() None#
_normalize_prompts(
prompts: Union[str, List[int], List[str], List[List[int]]],
) Tuple[Union[List[str], List[List[int]]], bool]#

Return (normalized_list, is_batch_input).

  • "abc" -> (["abc"], False)

  • [1, 2, 3] -> ([[1, 2, 3]], False) (single token-id prompt)

  • ["abc", "def"] -> (["abc", "def"], True)

  • [[1, 2], [3, 4]] -> ([[1, 2], [3, 4]], True)

  • [] -> ([], True)

Only the first element is inspected to distinguish single vs batch; per-element type validation is left to the engine.

async _generate_impl(
prompts: Union[List[str], List[List[int]]],
sp: megatron.core.inference.sampling_params.SamplingParams,
) List[megatron.core.inference.inference_request.DynamicInferenceRequest]#

Run inference for a non-empty list of prompts; returns input-ordered list.

  • Coordinator mode: must run on the runtime loop (via _loop_manager.run_async); enqueues requests through client.add_request and gathers all futures.

  • Direct mode: runs on the caller’s event loop; offloads the synchronous engine.generate to a thread.

async _pause_impl() None#
async _unpause_impl() None#
async _suspend_impl() None#
async _resume_impl() None#
async _shutdown_impl() None#
async _wait_for_shutdown_impl() None#