morpheus.messages

Message classes, which contain data being transfered between pipeline stages

class ControlMessage

Methods

add_task(self, task_type, task)

config(*args, **kwargs) Overloaded function.
copy(self)

filter_timestamp(self, regex_filter) Retrieve timestamps matching a regex filter within a given group.
get_metadata(self[, key, default_value])

get_tasks(self)

get_timestamp(self, key[, fail_if_nonexist]) Retrieve the timestamp for a given group and key.
has_metadata(self, key)

has_task(self, task_type)

list_metadata(self)

payload(*args, **kwargs) Overloaded function.
remove_task(self, task_type)

set_metadata(self, key, value)

set_timestamp(self, key, timestamp) Set a timestamp for a given key and group.
task_type(*args, **kwargs) Overloaded function.
tensors(*args, **kwargs) Overloaded function.
add_task(self: morpheus._lib.messages.ControlMessage, task_type: str, task: dict)None

config(*args, **kwargs)

Overloaded function.

  1. config(self: morpheus._lib.messages.ControlMessage, config: dict) -> None

  2. config(self: morpheus._lib.messages.ControlMessage) -> dict

copy(self: morpheus._lib.messages.ControlMessage)morpheus._lib.messages.ControlMessage

filter_timestamp(self: morpheus._lib.messages.ControlMessage, regex_filter: str)dict

Retrieve timestamps matching a regex filter within a given group.

get_metadata(self: morpheus._lib.messages.ControlMessage, key: object = None, default_value: object = None)object

get_tasks(self: morpheus._lib.messages.ControlMessage)dict

get_timestamp(self: morpheus._lib.messages.ControlMessage, key: str, fail_if_nonexist: bool = False)object

Retrieve the timestamp for a given group and key. Returns None if the timestamp does not exist and fail_if_nonexist is False.

has_metadata(self: morpheus._lib.messages.ControlMessage, key: str)bool

has_task(self: morpheus._lib.messages.ControlMessage, task_type: str)bool

list_metadata(self: morpheus._lib.messages.ControlMessage)list

payload(*args, **kwargs)

Overloaded function.

  1. payload(self: morpheus._lib.messages.ControlMessage) -> morpheus._lib.messages.MessageMeta

  2. payload(self: morpheus._lib.messages.ControlMessage, arg0: morpheus._lib.messages.MessageMeta) -> None

  3. payload(self: morpheus._lib.messages.ControlMessage, meta: object) -> None

remove_task(self: morpheus._lib.messages.ControlMessage, task_type: str)dict

set_metadata(self: morpheus._lib.messages.ControlMessage, key: str, value: object)None

set_timestamp(self: morpheus._lib.messages.ControlMessage, key: str, timestamp: object)None

Set a timestamp for a given key and group.

task_type(*args, **kwargs)

Overloaded function.

  1. task_type(self: morpheus._lib.messages.ControlMessage) -> morpheus._lib.messages.ControlMessageType

  2. task_type(self: morpheus._lib.messages.ControlMessage, task_type: morpheus._lib.messages.ControlMessageType) -> None

tensors(*args, **kwargs)

Overloaded function.

  1. tensors(self: morpheus._lib.messages.ControlMessage) -> morpheus._lib.messages.TensorMemory

  2. tensors(self: morpheus._lib.messages.ControlMessage, arg0: morpheus._lib.messages.TensorMemory) -> None

class DataLoaderRegistry

Methods

contains(name)

list()

register_loader(name, loader[, throw_if_exists])

unregister_loader(name[, throw_if_not_exists])

static contains(name: str)bool

static list() → List[str]

static register_loader(name: str, loader: Callable[[morpheus._lib.messages.ControlMessage, dict], morpheus._lib.messages.ControlMessage], throw_if_exists: bool = True)None

static unregister_loader(name: str, throw_if_not_exists: bool = True)None

class InferenceMemory(*, count=None, tensors=None)[source]

This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in cupy arrays.

Attributes
tensor_names

Methods

get_input(name) Get the tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor) Update the input tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
count: int

get_input(name)[source]

Get the tensor stored in the container identified by name. Alias for InferenceMemory.get_tensor.

Parameters
name

Key used to do lookup in inputs dict of the container.

Returns
cupy.ndarray

Inputs corresponding to name.

Raises
KeyError

If input name does not exist in the container.

set_input(name, tensor)[source]

Update the input tensor identified by name. Alias for InferenceMemory.set_tensor

Parameters
name

Key used to do lookup in inputs dict of the container.

tensor

Tensor as a CuPy array.

tensors: Dict[str, cupy.ndarray]

class InferenceMemoryAE(*, count, inputs, seq_ids)[source]

This is a container class for data that needs to be submitted to the inference server for auto encoder usecases.

Parameters
inputs

Inference input.

seq_ids

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes
input

seq_ids

tensor_names

Methods

get_input(name) Get the tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor) Update the input tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
input: dataclasses.InitVar[cupy.ndarray] = None

seq_ids: dataclasses.InitVar[cupy.ndarray] = None

class InferenceMemoryFIL(*, count, input__0, seq_ids)[source]

This is a container class for data that needs to be submitted to the inference server for FIL category usecases.

Parameters
input__0

Inference input.

seq_ids

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes
input__0

seq_ids

tensor_names

Methods

get_input(name) Get the tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor) Update the input tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
input__0: dataclasses.InitVar[cupy.ndarray] = None

seq_ids: dataclasses.InitVar[cupy.ndarray] = None

class InferenceMemoryNLP(*, count, input_ids, input_mask, seq_ids)[source]

This is a container class for data that needs to be submitted to the inference server for NLP category usecases.

Parameters
input_ids

The token-ids for each string padded with 0s to max_length.

input_mask

The mask for token-ids result where corresponding positions identify valid token-id values.

seq_ids

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes
input_ids

input_mask

seq_ids

tensor_names

Methods

get_input(name) Get the tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor) Update the input tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
input_ids: dataclasses.InitVar[cupy.ndarray] = None

input_mask: dataclasses.InitVar[cupy.ndarray] = None

seq_ids: dataclasses.InitVar[cupy.ndarray] = None

class MessageBase[source]

Base class for all messages. Returns a C++ implementation if CppConfig.get_should_use_cpp() is True and the class has an associated C++ implementation (cpp_class), returns the Python implementation for all others.

class MessageMeta(df)[source]

This is a container class to hold batch deserialized messages metadata.

Parameters
df

Input rows in dataframe.

Attributes
count

Returns the number of messages in the batch.

df

Methods

ensure_sliceable_index() Replaces the index in the underlying dataframe if the existing one is not unique and monotonic.
get_meta_range(mess_offset, message_count[, ...]) Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.
has_sliceable_index() Returns True if the underlying DataFrame's index is unique and monotonic.
copy_dataframe
get_column_names
mutable_dataframe
copy_dataframe()[source]

property count: int

Returns the number of messages in the batch.

Returns
int

number of messages in the MessageMeta.df.

property df: Union[pandas.DataFrame, cudf.DataFrame]

ensure_sliceable_index()[source]

Replaces the index in the underlying dataframe if the existing one is not unique and monotonic. The old index will be preserved in a column named _index_{old_index.name}. If has_sliceable_index() == true, this is a no-op.

Returns
str

The name of the column with the old index or None if no changes were made

get_column_names()[source]

get_meta_range(mess_offset, message_count, columns=None)[source]

Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.

Parameters
mess_offset

Offset into the metadata batch.

message_count

Messages count.

columns

Input column names. Returns all columns if None is specified. When a string is passed, a Series is returned. Otherwise a Dataframe is returned.

Returns
Series or Dataframe

Column values from the dataframe.

has_sliceable_index()[source]

Returns True if the underlying DataFrame’s index is unique and monotonic. Sliceable indices have better performance since a range of rows can be specified by a start and stop index instead of requiring boolean masks.

Returns
bool

mutable_dataframe()[source]

class MultiAEMessage(*, meta, mess_offset=0, mess_count=- 1, model, train_scores_mean=0.0, train_scores_std=1.0)[source]

Subclass of MultiMessage specific to the AutoEncoder pipeline, which contains the model.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message instance for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_meta([columns]) Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_slice(start, stop) Returns sliced batches based on offsets supplied.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
model: morpheus.models.dfencoder.autoencoder.AutoEncoder

train_scores_mean: float

train_scores_std: float

class MultiInferenceFILMessage(*, meta, mess_offset=0, mess_count=- 1, memory=None, offset=0, count=- 1)[source]

A stronger typed version of MultiInferenceMessage that is used for FIL workloads. Helps ensure the proper inputs are set and eases debugging.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

input__0

Input to FIL model inference.

inputs

Get inputs stored in the InferenceMemory container.

seq_ids

Returns sequence ids, which are used to keep track of messages in a multi-threaded environment.

tensors

Get tensors stored in the TensorMemory container sliced according to offset and count.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message, dataframe and tensors for the given ranges of rows.
copy_tensor_ranges(ranges[, mask]) Perform a copy of the underlying tensor tensors for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_id_tensor() Get the tensor that holds message ID information.
get_input(name) Get input stored in the InferenceMemory container.
get_meta([columns]) Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_slice(start, stop) Perform a slice of the current message from start:stop (excluding stop)
get_tensor(name) Get tensor stored in the TensorMemory container.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
property input__0

Input to FIL model inference.

Returns
cupy.ndarray

Input data.

required_tensors: ClassVar[List[str]] = ['input__0', 'seq_ids']

The tensor names that are required for instantiation

property seq_ids

Returns sequence ids, which are used to keep track of messages in a multi-threaded environment.

Returns
cupy.ndarray

Sequence ids.

class MultiInferenceMessage(*, meta, mess_offset=0, mess_count=- 1, memory=None, offset=0, count=- 1)[source]

This is a container class that holds the InferenceMemory container and the metadata of the data contained within it. Builds on top of the MultiTensorMessage class to add additional data for inferencing.

This class requires two separate memory blocks for a batch. One for the message metadata (i.e., start time, IP address, etc.) and another for the raw inference inputs (i.e., input_ids, seq_ids). Since there can be more inference input requests than messages (This happens when some messages get broken into multiple inference requests) this class stores two different offset and count values. mess_offset and mess_count refer to the offset and count in the message metadata batch and offset and count index into the inference batch data.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

inputs

Get inputs stored in the InferenceMemory container.

tensors

Get tensors stored in the TensorMemory container sliced according to offset and count.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message, dataframe and tensors for the given ranges of rows.
copy_tensor_ranges(ranges[, mask]) Perform a copy of the underlying tensor tensors for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_id_tensor() Get the tensor that holds message ID information.
get_input(name) Get input stored in the InferenceMemory container.
get_meta([columns]) Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_slice(start, stop) Perform a slice of the current message from start:stop (excluding stop)
get_tensor(name) Get tensor stored in the TensorMemory container.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
count: int

get_input(name)[source]

Get input stored in the InferenceMemory container. Alias for MultiInferenceMessage.get_tensor.

Parameters
name

Input key name.

Returns
cupy.ndarray

Inference input.

Raises
KeyError

When no matching input tensor exists.

property inputs

Get inputs stored in the InferenceMemory container. Alias for MultiInferenceMessage.tensors.

Returns
cupy.ndarray

Inference inputs.

memory: morpheus.messages.memory.tensor_memory.TensorMemory

offset: int

class MultiInferenceNLPMessage(*, meta, mess_offset=0, mess_count=- 1, memory=None, offset=0, count=- 1)[source]

A stronger typed version of MultiInferenceMessage that is used for NLP workloads. Helps ensure the proper inputs are set and eases debugging.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

input_ids

Returns token-ids for each string padded with 0s to max_length.

input_mask

Returns mask for token-ids result where corresponding positions identify valid token-id values.

inputs

Get inputs stored in the InferenceMemory container.

seq_ids

Returns sequence ids, which are used to keep track of which inference requests belong to each message.

tensors

Get tensors stored in the TensorMemory container sliced according to offset and count.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message, dataframe and tensors for the given ranges of rows.
copy_tensor_ranges(ranges[, mask]) Perform a copy of the underlying tensor tensors for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_id_tensor() Get the tensor that holds message ID information.
get_input(name) Get input stored in the InferenceMemory container.
get_meta([columns]) Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_slice(start, stop) Perform a slice of the current message from start:stop (excluding stop)
get_tensor(name) Get tensor stored in the TensorMemory container.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
property input_ids

Returns token-ids for each string padded with 0s to max_length.

Returns
cupy.ndarray

The token-ids for each string padded with 0s to max_length.

property input_mask

Returns mask for token-ids result where corresponding positions identify valid token-id values.

Returns
cupy.ndarray

The mask for token-ids result where corresponding positions identify valid token-id values.

required_tensors: ClassVar[List[str]] = ['input_ids', 'input_mask', 'seq_ids']

The tensor names that are required for instantiation

property seq_ids

Returns sequence ids, which are used to keep track of which inference requests belong to each message.

Returns
cupy.ndarray

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

class MultiMessage(*, meta, mess_offset=0, mess_count=- 1)[source]

This class holds data for multiple messages at a time. To avoid copying data for slicing operations, it holds a reference to a batched metadata object and stores the offset and count into that batch.

Parameters
meta : MessageMeta

Deserialized messages metadata for large batch.

mess_offset

Offset into the metadata batch.

mess_count

Messages count.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message instance for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_meta() Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_slice(start, stop) Returns sliced batches based on offsets supplied.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
copy_meta_ranges(ranges, mask=None)[source]

Perform a copy of the underlying dataframe for the given ranges of rows.

Parameters
ranges

Rows to include in the copy in the form of [(`start_row, stop_row),…]` The stop_row isn’t included. For example to copy rows 1-2 & 5-7 ranges=[(1, 3), (5, 8)]

mask

Optionally specify rows as a cupy array (when using cudf Dataframes) or a numpy array (when using pandas Dataframes) of booleans. When not-None ranges will be ignored. This is useful as an optimization as this avoids needing to generate the mask on it’s own.

Returns
Dataframe

copy_ranges(ranges)[source]

Perform a copy of the current message instance for the given ranges of rows.

Parameters
ranges

Rows to include in the copy in the form of [(`start_row, stop_row),…]` The stop_row isn’t included. For example to copy rows 1-2 & 5-7 ranges=[(1, 3), (5, 8)]

Returns
MultiMessage

classmethod from_message(message, *, meta=None, mess_offset=- 1, mess_count=- 1, **kwargs)[source]

Creates a new instance of a derived class from MultiMessage using an existing message as the template. This is very useful when a new message needs to be created with a single change to an existing MessageMeta.

When creating the new message, all required arguments for the class specified by cls will be pulled from message unless otherwise specified in the kwargs. Special handling is performed depending on whether or not a new meta object is supplied. If one is supplied, the offset and count defaults will be 0 and meta.count respectively. Otherwise offset and count will be pulled from the input message.

Parameters
cls

The class to create

message

An existing message to use as a template. Can be a base or derived from cls as long as all arguments can be pulled from message or proveded in kwargs

meta

A new MessageMeta to use, by default None

mess_offset

A new mess_offset to use, by default -1

mess_count

A new mess_count to use, by default -1

**kwargs : dict

Keyword arguments to use when creating the new instance.

Returns
Self

A new instance of type cls

Raises
ValueError

If the incoming message is None

AttributeError

If some required arguments were not supplied by kwargs and could not be pulled from message

get_meta() → cudf.DataFrame[source]
get_meta(columns: str) → cudf.Series
get_meta(columns: List[str]) → cudf.DataFrame

Return column values from morpheus.pipeline.messages.MessageMeta.df.

Parameters
columns

Input column names. Returns all columns if None is specified. When a string is passed, a Series is returned. Otherwise, a Dataframe is returned.

Returns
Series or Dataframe

Column values from the dataframe.

get_meta_column_names()[source]

Return column names available in the underlying DataFrame.

Returns
list[str]

Column names from the dataframe.

get_meta_list(col_name=None)[source]

Return a column values from morpheus.messages.MessageMeta.df as a list.

Parameters
col_name

Column name in the dataframe.

Returns
List[str]

Column values from the dataframe.

get_slice(start, stop)[source]

Returns sliced batches based on offsets supplied. Automatically calculates the correct mess_offset and mess_count.

Parameters
start

Start offset address.

stop

Stop offset address.

Returns
MultiInferenceMessage

A new MultiInferenceMessage with sliced offset and count.

property id: List[int]

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

Returns
List[int]

ID column values from the dataframe as list.

property id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

Returns
pandas.Series

ID column values from the dataframe.

mess_count: int

mess_offset: int

meta: morpheus.messages.message_meta.MessageMeta

set_meta(columns, value)[source]

Set column values to morpheus.pipelines.messages.MessageMeta.df.

Parameters
columns

Input column names. Sets the value for the corresponding column names. If None is specified, all columns will be used. If the column does not exist, a new one will be created.

value

Value to apply to the specified columns. If a single value is passed, it will be broadcast to all rows. If a Series or Dataframe is passed, rows will be matched by index.

property timestamp: List[int]

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Returns
List[int]

Timestamp column values from the dataframe as list.

class MultiResponseAEMessage(*, meta, mess_offset=0, mess_count=- 1, memory=None, offset=0, count=- 1, id_tensor_name='seq_ids', probs_tensor_name='probs', user_id=None)[source]

A stronger typed version of MultiResponseProbsMessage that is used for inference workloads that return a probability array. Helps ensure the proper outputs are set and eases debugging.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

outputs

Get outputs stored in the TensorMemory container.

tensors

Get tensors stored in the TensorMemory container sliced according to offset and count.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

user_id

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message, dataframe and tensors for the given ranges of rows.
copy_tensor_ranges(ranges[, mask]) Perform a copy of the underlying tensor tensors for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_id_tensor() Get the tensor that holds message ID information.
get_meta([columns]) Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_output(name) Get output stored in the TensorMemory container.
get_probs_tensor() Get the tensor that holds output probabilities.
get_slice(start, stop) Perform a slice of the current message from start:stop (excluding stop)
get_tensor(name) Get tensor stored in the TensorMemory container.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
user_id: str = None

class MultiResponseMessage(*, meta, mess_offset=0, mess_count=- 1, memory=None, offset=0, count=- 1, id_tensor_name='seq_ids', probs_tensor_name='probs')[source]

This class contains several inference responses as well as the cooresponding message metadata.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

outputs

Get outputs stored in the TensorMemory container.

tensors

Get tensors stored in the TensorMemory container sliced according to offset and count.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message, dataframe and tensors for the given ranges of rows.
copy_tensor_ranges(ranges[, mask]) Perform a copy of the underlying tensor tensors for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_id_tensor() Get the tensor that holds message ID information.
get_meta([columns]) Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_output(name) Get output stored in the TensorMemory container.
get_probs_tensor() Get the tensor that holds output probabilities.
get_slice(start, stop) Perform a slice of the current message from start:stop (excluding stop)
get_tensor(name) Get tensor stored in the TensorMemory container.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
get_output(name)[source]

Get output stored in the TensorMemory container. Alias for MultiResponseMessage.get_tensor.

Parameters
name

Output key name.

Returns
cupy.ndarray

Inference output.

get_probs_tensor()[source]

Get the tensor that holds output probabilities. Equivalent to get_tensor(probs_tensor_name)

Returns
cupy.ndarray

The probabilities tensor

Raises
KeyError

If self.probs_tensor_name is not found in the tensors

property outputs

Get outputs stored in the TensorMemory container. Alias for MultiResponseMessage.tensors.

Returns
cupy.ndarray

Inference outputs.

probs_tensor_name: ClassVar[str] = 'probs'

Name of the tensor that holds output probabilities

class MultiResponseProbsMessage(*args, **kwargs)[source]

A stronger typed version of MultiResponseMessage that is used for inference workloads that return a probability array. Helps ensure the proper outputs are set and eases debugging.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

outputs

Get outputs stored in the TensorMemory container.

probs

Probabilities of prediction.

tensors

Get tensors stored in the TensorMemory container sliced according to offset and count.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message, dataframe and tensors for the given ranges of rows.
copy_tensor_ranges(ranges[, mask]) Perform a copy of the underlying tensor tensors for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_id_tensor() Get the tensor that holds message ID information.
get_meta([columns]) Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_output(name) Get output stored in the TensorMemory container.
get_probs_tensor() Get the tensor that holds output probabilities.
get_slice(start, stop) Perform a slice of the current message from start:stop (excluding stop)
get_tensor(name) Get tensor stored in the TensorMemory container.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
property probs

Probabilities of prediction.

Returns
cupy.ndarray

probabilities

required_tensors: ClassVar[List[str]] = ['probs']

The tensor names that are required for instantiation

class MultiTensorMessage(*, meta, mess_offset=0, mess_count=- 1, memory, offset=0, count=- 1, id_tensor_name='seq_ids')[source]

This class contains several inference responses as well as the corresponding message metadata.

Parameters
memory : TensorMemory

Container holding generic tensor data in cupy arrays

offset

Offset of each message into the TensorMemory block.

count

Number of rows in the TensorMemory block.

Attributes
id

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df as list.

id_col

Returns ID column values from morpheus.pipeline.messages.MessageMeta.df.

tensors

Get tensors stored in the TensorMemory container sliced according to offset and count.

timestamp

Returns timestamp column values from morpheus.messages.MessageMeta.df as list.

Methods

copy_meta_ranges(ranges[, mask]) Perform a copy of the underlying dataframe for the given ranges of rows.
copy_ranges(ranges) Perform a copy of the current message, dataframe and tensors for the given ranges of rows.
copy_tensor_ranges(ranges[, mask]) Perform a copy of the underlying tensor tensors for the given ranges of rows.
from_message(message, *[, meta, ...]) Creates a new instance of a derived class from MultiMessage using an existing message as the template.
get_id_tensor() Get the tensor that holds message ID information.
get_meta([columns]) Return column values from morpheus.pipeline.messages.MessageMeta.df.
get_meta_column_names() Return column names available in the underlying DataFrame.
get_meta_list([col_name]) Return a column values from morpheus.messages.MessageMeta.df as a list.
get_slice(start, stop) Perform a slice of the current message from start:stop (excluding stop)
get_tensor(name) Get tensor stored in the TensorMemory container.
set_meta(columns, value) Set column values to morpheus.pipelines.messages.MessageMeta.df.
copy_ranges(ranges)[source]

Perform a copy of the current message, dataframe and tensors for the given ranges of rows.

Parameters
ranges

Rows to include in the copy in the form of [(`start_row, stop_row),…]` The stop_row isn’t included. For example to copy rows 1-2 & 5-7 ranges=[(1, 3), (5, 8)]

——-

`MultiTensorMessage`

copy_tensor_ranges(ranges, mask=None)[source]

Perform a copy of the underlying tensor tensors for the given ranges of rows.

Parameters
ranges

Rows to include in the copy in the form of [(`start_row, stop_row),…]` The stop_row isn’t included. For example to copy rows 1-2 & 5-7 ranges=[(1, 3), (5, 8)]

mask

Optionally specify rows as a cupy array (when using cudf Dataframes) or a numpy array (when using pandas Dataframes) of booleans. When not-None ranges will be ignored. This is useful as an optimization as this avoids needing to generate the mask on it’s own.

Returns
typing.Dict[str, cupy.ndarray]

count: int

classmethod from_message(message, *, meta=None, mess_offset=- 1, mess_count=- 1, memory=None, offset=- 1, count=- 1, **kwargs)[source]

Creates a new instance of a derived class from MultiMessage using an existing message as the template. This is very useful when a new message needs to be created with a single change to an existing MessageMeta.

When creating the new message, all required arguments for the class specified by cls will be pulled from message unless otherwise specified in the kwargs. Special handling is performed depending on whether or not a new meta object is supplied. If one is supplied, the offset and count defaults will be 0 and meta.count respectively. Otherwise offset and count will be pulled from the input message.

Parameters
cls

The class to create

message

An existing message to use as a template. Can be a base or derived from cls as long as all arguments can be pulled from message or proveded in kwargs

meta

A new MessageMeta to use, by default None

mess_offset

A new mess_offset to use, by default -1

mess_count

A new mess_count to use, by default -1

memory

A new TensorMemory to use. If supplied, offset and count default to 0 and memory.count respectively. By default None

offset

A new offset to use, by default -1

count

A new count to use, by default -1

**kwargs : dict

Keyword arguments to use when creating the new instance.

Returns
Self

A new instance of type cls

Raises
ValueError

If the incoming message is None

get_id_tensor()[source]

Get the tensor that holds message ID information. Equivalent to get_tensor(id_tensor_name)

Returns
cupy.ndarray

Array containing the ID information

Raises
KeyError

If self.id_tensor_name is not found in the tensors

get_slice(start, stop)[source]

Perform a slice of the current message from start:stop (excluding stop)

For example to slice from rows 1-3 use m.get_slice(1, 4). The returned MultiTensorMessage will contain references to the same underlying Dataframe and tensor tensors, and this calling this method is reletively low cost compared to MultiTensorMessage.copy_ranges

Parameters
start

Starting row of the slice

stop

Stop of the slice

——-

`MultiTensorMessage`

get_tensor(name)[source]

Get tensor stored in the TensorMemory container.

Parameters
name

tensor key name.

Returns
cupy.ndarray

Inference tensor.

id_tensor_name: ClassVar[str] = 'seq_ids'

Name of the tensor that correlates tensor rows to message IDs

memory: morpheus.messages.memory.tensor_memory.TensorMemory

offset: int

required_tensors: ClassVar[List[str]] = []

The tensor names that are required for instantiation

property tensors

Get tensors stored in the TensorMemory container sliced according to offset and count.

Returns
cupy.ndarray

Inference tensors.

class ResponseMemory(*args, **kwargs)[source]

Output memory block holding the results of inference.

Attributes
tensor_names

Methods

get_output(name) Get the Tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor) Update the output tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
count: int

get_output(name)[source]

Get the Tensor stored in the container identified by name. Alias for ResponseMemory.get_tensor.

Parameters
name

Key used to do lookup in tensors dict of message container.

Returns
cupy.ndarray

Tensors corresponding to name.

Raises
KeyError

If output name does not exist in message container.

set_output(name, tensor)[source]

Update the output tensor identified by name. Alias for ResponseMemory.set_tensor

Parameters
name

Key used to do lookup in tensors dict of the container.

tensor

Tensor as a CuPy array.

Raises
ValueError

If the number of rows in tensor does not match count

tensors: Dict[str, cupy.ndarray]

class ResponseMemoryAE(*args, **kwargs)[source]

Subclass of ResponseMemory specific to the AutoEncoder pipeline.

Parameters
probs

Probabilities tensor

user_id

User id the inference was performed against.

explain_df

Explainability Dataframe, for each feature a column will exist with a name in the form of: {feature}_z_loss containing the loss z-score along with max_abs_z and mean_abs_z columns

Attributes
explain_df

probs

tensor_names

Methods

get_output(name) Get the Tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor) Update the output tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
explain_df = None

probs: dataclasses.InitVar[cupy.ndarray] = None

user_id = ''

class ResponseMemoryProbs(*args, **kwargs)[source]

Subclass of ResponseMemory containng an output tensor named ‘probs’.

Parameters
probs

Probabilities tensor

Attributes
probs

tensor_names

Methods

get_output(name) Get the Tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor) Update the output tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
probs: dataclasses.InitVar[cupy.ndarray] = None

class TensorMemory(*, count=None, tensors=None)[source]

This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in cupy arrays.

Parameters
count

Length of each tensor contained in tensors.

tensors

Collection of tensors uniquely identified by a name.

Attributes
tensor_names

Methods

get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
count: int

get_tensor(name)[source]

Get the Tensor stored in the container identified by name.

Parameters
name

Tensor key name.

Returns
cupy.ndarray

Tensor.

Raises
KeyError

If tensor name does not exist in the container.

get_tensors()[source]

Get the tensors contained by this instance. It is important to note that when C++ execution is enabled the returned tensors will be a Python copy of the tensors stored in the C++ object. As such any changes made to the tensors will need to be updated with a call to set_tensors.

Returns
typing.Dict[str, cp.ndarray]

has_tensor(name)[source]

Returns True if a tensor with the requested name exists in the tensors object

Parameters
name

Name to lookup

Returns
bool

True if the tensor was found

set_tensor(name, tensor)[source]

Update the tensor identified by name.

Parameters
name

Tensor key name.

tensor

Tensor as a CuPy array.

Raises
ValueError

If the number of rows in tensor does not match count

set_tensors(tensors)[source]

Overwrite the tensors stored by this instance. If the length of the tensors has changed, then the count property should also be updated.

Parameters
tensors

Collection of tensors uniquely identified by a name.

property tensor_names: List[str]

tensors: Dict[str, cupy.ndarray]

class UserMessageMeta(df, user_id)[source]

This class extends MessageMeta to also hold userid corresponding to batched metadata.

Parameters
df

Input rows in dataframe.

user_id

User id.

Attributes
count

Returns the number of messages in the batch.

df

Methods

ensure_sliceable_index() Replaces the index in the underlying dataframe if the existing one is not unique and monotonic.
get_meta_range(mess_offset, message_count[, ...]) Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.
has_sliceable_index() Returns True if the underlying DataFrame's index is unique and monotonic.
copy_dataframe
get_column_names
mutable_dataframe
user_id: str

Modules

morpheus.messages.data_class_prop

morpheus.messages.memory Memory classes
morpheus.messages.message_base

morpheus.messages.message_meta

morpheus.messages.multi_ae_message

morpheus.messages.multi_inference_ae_message

morpheus.messages.multi_inference_message

morpheus.messages.multi_message

morpheus.messages.multi_response_message

morpheus.messages.multi_tensor_message

Previous morpheus.loaders.sql_loader
Next morpheus.messages.data_class_prop
© Copyright 2024, NVIDIA. Last updated on Apr 11, 2024.