nat.middleware.function_middleware#

Function-specific middleware for the NeMo Agent toolkit.

This module provides function-specific middleware implementations that extend the base Middleware class. FunctionMiddleware is a specialized middleware type designed specifically for wrapping function calls with dedicated methods for function-specific preprocessing and postprocessing.

Middleware is configured at registration time and is bound to instances when they are constructed by the workflow builder.

Middleware executes in the order provided and can optionally be marked as final. A final middleware terminates the chain, preventing subsequent middleware or the wrapped target from running unless the final middleware explicitly delegates to the next callable.

Classes#

FunctionMiddleware

Base class for function middleware with pre/post-invoke hooks.

FunctionMiddlewareChain

Composes middleware into an execution chain.

Functions#

validate_middleware(...)

Validate a sequence of middleware, enforcing ordering guarantees.

Module Contents#

class FunctionMiddleware(*, is_final: bool = False)#

Bases: nat.middleware.middleware.Middleware

Base class for function middleware with pre/post-invoke hooks.

Middleware intercepts function calls and can: - Transform inputs before execution (pre_invoke) - Transform outputs after execution (post_invoke) - Override function_middleware_invoke for full control

Lifecycle: - Framework checks enabled property before calling any methods - If disabled, middleware is skipped entirely (no methods called) - Users do NOT need to check enabled in their implementations

Inherited abstract members that must be implemented: - enabled: Property that returns whether middleware should run - pre_invoke: Transform inputs before function execution - post_invoke: Transform outputs after function execution

Context Flow: - FunctionMiddlewareContext (frozen): Static function metadata only - InvocationContext: Unified context for both pre and post invoke phases - Pre-invoke: output is None, modify modified_args/modified_kwargs - Post-invoke: output has the result, modify output to transform

Example:

class LoggingMiddleware(FunctionMiddleware):
    def __init__(self, config: LoggingConfig):
        super().__init__()
        self._config = config

    @property
    def enabled(self) -> bool:
        return self._config.enabled

    async def pre_invoke(self, context: InvocationContext) -> InvocationContext | None:
        logger.info(f"Calling {context.function_context.name} with {context.modified_args}")
        logger.info(f"Original args: {context.original_args}")
        return None  # Pass through unchanged

    async def post_invoke(self, context: InvocationContext) -> InvocationContext | None:
        logger.info(f"Result: {context.output}")
        return None  # Pass through unchanged
property enabled: bool#

Check if this middleware is enabled.

Returns:

True if the middleware should be applied, False otherwise. Default implementation always returns True.

async pre_invoke(
context: nat.middleware.middleware.InvocationContext,
) nat.middleware.middleware.InvocationContext | None#

Pre-invocation hook called before the function is invoked.

Args:

context: Invocation context containing function metadata and args

Returns:

InvocationContext if modified, or None to pass through unchanged. Default implementation does nothing.

async post_invoke(
context: nat.middleware.middleware.InvocationContext,
) nat.middleware.middleware.InvocationContext | None#

Post-invocation hook called after the function returns.

Args:

context: Invocation context containing function metadata, args, and output

Returns:

InvocationContext if modified, or None to pass through unchanged. Default implementation does nothing.

async middleware_invoke(
*args: Any,
call_next: nat.middleware.middleware.CallNext,
context: nat.middleware.middleware.FunctionMiddlewareContext,
\*\*kwargs: Any,
) Any#

Delegate to function_middleware_invoke for function-specific handling.

async middleware_stream(
*args: Any,
call_next: nat.middleware.middleware.CallNextStream,
context: nat.middleware.middleware.FunctionMiddlewareContext,
\*\*kwargs: Any,
) collections.abc.AsyncIterator[Any]#

Delegate to function_middleware_stream for function-specific handling.

async function_middleware_invoke(
*args: Any,
call_next: nat.middleware.middleware.CallNext,
context: nat.middleware.middleware.FunctionMiddlewareContext,
\*\*kwargs: Any,
) Any#

Execute middleware hooks around function call.

Default implementation orchestrates: pre_invoke → call_next → post_invoke

Override for full control over execution flow (e.g., caching, retry logic, conditional execution).

Note: Framework checks enabled before calling this method. You do NOT need to check enabled yourself.

Args:

args: Positional arguments for the function (first arg is typically the input value). call_next: Callable to invoke next middleware or target function. context: Static function metadata. kwargs: Keyword arguments for the function.

Returns:

The (potentially transformed) function output.

async function_middleware_stream(
*args: Any,
call_next: nat.middleware.middleware.CallNextStream,
context: nat.middleware.middleware.FunctionMiddlewareContext,
\*\*kwargs: Any,
) collections.abc.AsyncIterator[Any]#

Execute middleware hooks around streaming function call.

Pre-invoke runs once before streaming starts. Post-invoke runs per-chunk as they stream through.

Override for custom streaming behavior (e.g., buffering, aggregation, chunk filtering).

Note: Framework checks enabled before calling this method. You do NOT need to check enabled yourself.

Args:

args: Positional arguments for the function (first arg is typically the input value). call_next: Callable to invoke next middleware or target stream. context: Static function metadata. kwargs: Keyword arguments for the function.

Yields:

Stream chunks (potentially transformed by post_invoke).

class FunctionMiddlewareChain(
*,
middleware: collections.abc.Sequence[FunctionMiddleware],
context: nat.middleware.middleware.FunctionMiddlewareContext,
)#

Composes middleware into an execution chain.

The chain builder checks each middleware’s enabled property. Disabled middleware is skipped entirely—no methods are called.

Execution order: - Pre-invoke: first middleware → last middleware → function - Post-invoke: function → last middleware → first middleware

Context: - FunctionMiddlewareContext contains only static function metadata - Original args/kwargs are captured by the orchestration layer - Middleware receives InvocationContext with frozen originals and mutable args/output

Initialize the middleware chain.

Args:

middleware: Sequence of middleware to chain (order matters) context: Static function metadata

_middleware#
_context#
build_single(
final_call: nat.middleware.middleware.CallNext,
) nat.middleware.middleware.CallNext#

Build the middleware chain for single-output invocations.

Disabled middleware (enabled=False) is skipped entirely.

Args:

final_call: The final function to call (the actual function implementation)

Returns:

A callable that executes the entire middleware chain

build_stream(
final_call: nat.middleware.middleware.CallNextStream,
) nat.middleware.middleware.CallNextStream#

Build the middleware chain for streaming invocations.

Disabled middleware (enabled=False) is skipped entirely.

Args:

final_call: The final function to call (the actual function implementation)

Returns:

A callable that executes the entire middleware chain

validate_middleware(
middleware: collections.abc.Sequence[nat.middleware.middleware.Middleware] | None,
) tuple[nat.middleware.middleware.Middleware, Ellipsis]#

Validate a sequence of middleware, enforcing ordering guarantees.