morpheus.pipeline.stage_decorator
Functions
Classes
PreAllocatedWrappedFunctionStage (config, *, ...) |
Source stage that wraps a generator function as the method for generating messages. |
WrappedFunctionSourceStage (config, *, name, ...) |
Source stage that wraps a generator function as the method for generating messages. |
WrappedFunctionStage (config, *[, name, ...]) |
Stage that wraps a function to be used for processing messages. |
- source(gen_fn=None, *, name=None, compute_schema_fn=None, execution_modes=(<ExecutionMode.GPU: 'GPU'>, ))[source]
Decorator for wrapping a function as a source stage. The function must be a generator method, and provide a provide a return type annotation.
When
compute_schema_fn
isNone
, the return type annotation will be used by the stage as the output type.When invoked the wrapped function will return a source stage, any additional keyword arguments passed in aside from the config, will be bound to the wrapped function via
functools.partial
.Examples
>>> @source ... def source_gen(*, dataframes: list[cudf.DataFrame]) -> collections.abc.Iterator[MessageMeta]: ... for df in dataframes: ... yield MessageMeta(df) ... >>>
>>> pipe.set_source(source_gen(config, dataframes=[df]))
- stage(on_data_fn=None, *, name=None, accept_type=None, compute_schema_fn=None, needed_columns=None, execution_modes=(<ExecutionMode.GPU: 'GPU'>, ))[source]
Decorator for wrapping a function as a stage. The function must receive at least one argument, the first argument must be the incoming message, and must return a value.
It is required to use type annotations for the function parameters and return type, as this will be used by the stage as the accept and output types. If the incoming message parameter has no type annotation, the stage will be use
typing.Any
as the input type. If the return type has no type annotation, the stage will be set to return the same type as the input type.When invoked the wrapped function will return a stage, any additional arguments passed in aside from the config, will be bound to the wrapped function via
functools.partial
.Examples
>>> @stage ... def multiplier(message: MessageMeta, *, column: str, value: int | float) -> MessageMeta: ... with message.mutable_dataframe() as df: ... df[column] = df[column] * value ... ... return message ... >>>
>>> pipe.add_stage(multiplier(config, column='v2', value=5))
>>> # This will fail since `column` is required but no default value is provided: >>> pipe.add_stage(multiplier(config, value=5))