morpheus.messages#

Message classes, which contain data being transfered between pipeline stages

class ControlMessage(*args, **kwargs)[source]#

Methods

get_metadata([key, default_value])

Return a given piece of metadata, if key is None return the entire metadata dictionary.

has_task(task_type)

Return True if the control message has at least one task of the given type

add_task

config

copy

filter_timestamp

get_tasks

get_timestamp

get_timestamps

has_metadata

list_metadata

payload

remove_task

set_metadata

set_timestamp

task_type

tensor_count

tensors

add_task(task_type, task)[source]#
config(config=None)[source]#
copy()[source]#
filter_timestamp(regex_filter)[source]#
get_metadata(key=None, default_value=None)[source]#

Return a given piece of metadata, if key is None return the entire metadata dictionary. If key is not found, default_value is returned.

Parameters:
  • key (str) – The key of the metadata to retrieve, or None for all metadata

  • default_value (Any) – The value to return if the key is not found, ignored if key is None

Returns:

The value of the metadata key, or the entire metadata dictionary if key is None

Return type:

Any

get_tasks()[source]#
get_timestamp(key, fail_if_nonexist=False)[source]#
get_timestamps()[source]#
has_metadata(key)[source]#
has_task(task_type)[source]#

Return True if the control message has at least one task of the given type

list_metadata()[source]#
payload(payload=None)[source]#
remove_task(task_type)[source]#
set_metadata(key, value)[source]#
set_timestamp(key, timestamp)[source]#
task_type(new_task_type=None)[source]#
tensor_count()[source]#
tensors(tensors=None)[source]#
class ControlMessageType#

Members:

INFERENCE

NONE

TRAINING

Attributes:
name

name(self: object) -> str

value
INFERENCE = <ControlMessageType.INFERENCE: 1>#
NONE = <ControlMessageType.INFERENCE: 1>#
TRAINING = <ControlMessageType.TRAINING: 2>#
property name#
property value#
class DataLoaderRegistry#

Methods

contains(name)

list()

register_loader(name, loader[, throw_if_exists])

unregister_loader(name[, throw_if_not_exists])

static contains(name: str) bool#
static list() list[str]#
static register_loader(
name: str,
loader: Callable[[morpheus._lib.messages.ControlMessage, dict], morpheus._lib.messages.ControlMessage],
throw_if_exists: bool = True,
) None#
static unregister_loader(
name: str,
throw_if_not_exists: bool = True,
) None#
class InferenceMemory(**kwargs)[source]#

This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in either CuPy or NumPy arrays.

Attributes:
tensor_names

Methods

get_input(name)

Get the tensor stored in the container identified by name.

get_tensor(name)

Get the Tensor stored in the container identified by name.

get_tensors()

Get the tensors contained by this instance.

has_tensor(name)

Returns True if a tensor with the requested name exists in the tensors object

set_input(name, tensor)

Update the input tensor identified by name.

set_tensor(name, tensor)

Update the tensor identified by name.

set_tensors(tensors)

Overwrite the tensors stored by this instance.

count: int#
get_input(name)[source]#

Get the tensor stored in the container identified by name. Alias for InferenceMemory.get_tensor.

Parameters:
namestr

Key used to do lookup in inputs dict of the container.

Returns:
NDArrayType

Inputs corresponding to name.

Raises:
KeyError

If input name does not exist in the container.

set_input(name, tensor)[source]#

Update the input tensor identified by name. Alias for InferenceMemory.set_tensor

Parameters:
namestr

Key used to do lookup in inputs dict of the container.

tensorNDArrayType

Tensor as either CuPy or NumPy array.

tensors: Dict[str, numpy.ndarray | cupy.ndarray]#
class InferenceMemoryAE(**kwargs)[source]#

This is a container class for data that needs to be submitted to the inference server for auto encoder usecases.

Parameters:
inputsNDArrayType

Inference input.

seq_idsNDArrayType

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes:
input
seq_ids
tensor_names

Methods

get_input(name)

Get the tensor stored in the container identified by name.

get_tensor(name)

Get the Tensor stored in the container identified by name.

get_tensors()

Get the tensors contained by this instance.

has_tensor(name)

Returns True if a tensor with the requested name exists in the tensors object

set_input(name, tensor)

Update the input tensor identified by name.

set_tensor(name, tensor)

Update the tensor identified by name.

set_tensors(tensors)

Overwrite the tensors stored by this instance.

input: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
seq_ids: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
class InferenceMemoryFIL(**kwargs)[source]#

This is a container class for data that needs to be submitted to the inference server for FIL category usecases.

Parameters:
input__0NDArrayType

Inference input.

seq_idsNDArrayType

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes:
input__0
seq_ids
tensor_names

Methods

get_input(name)

Get the tensor stored in the container identified by name.

get_tensor(name)

Get the Tensor stored in the container identified by name.

get_tensors()

Get the tensors contained by this instance.

has_tensor(name)

Returns True if a tensor with the requested name exists in the tensors object

set_input(name, tensor)

Update the input tensor identified by name.

set_tensor(name, tensor)

Update the tensor identified by name.

set_tensors(tensors)

Overwrite the tensors stored by this instance.

input__0: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
seq_ids: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
class InferenceMemoryNLP(**kwargs)[source]#

This is a container class for data that needs to be submitted to the inference server for NLP category usecases.

Parameters:
input_idsNDArrayType

The token-ids for each string padded with 0s to max_length.

input_maskNDArrayType

The mask for token-ids result where corresponding positions identify valid token-id values.

seq_idsNDArrayType

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes:
input_ids
input_mask
seq_ids
tensor_names

Methods

get_input(name)

Get the tensor stored in the container identified by name.

get_tensor(name)

Get the Tensor stored in the container identified by name.

get_tensors()

Get the tensors contained by this instance.

has_tensor(name)

Returns True if a tensor with the requested name exists in the tensors object

set_input(name, tensor)

Update the input tensor identified by name.

set_tensor(name, tensor)

Update the tensor identified by name.

set_tensors(tensors)

Overwrite the tensors stored by this instance.

input_ids: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
input_mask: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
seq_ids: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
class MessageBase[source]#

Base class for all messages. Returns a C++ implementation if CppConfig.get_should_use_cpp() is True and the class has an associated C++ implementation (cpp_class), returns the Python implementation for all others.

class MessageMeta(**kwargs)[source]#

This is a container class to hold batch deserialized messages metadata.

Parameters:
dfpandas.DataFrame

Input rows in dataframe.

Attributes:
count

Returns the number of messages in the batch.

df

Methods

copy_ranges(ranges)

Perform a copy of the current message instance for the given ranges of rows.

ensure_sliceable_index()

Replaces the index in the underlying dataframe if the existing one is not unique and monotonic.

get_data()

Return column values from the underlying DataFrame.

get_meta_range(mess_offset, message_count[, ...])

Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.

get_slice(start, stop)

Returns a new MessageMeta with only the rows specified by start/stop.

has_sliceable_index()

Returns True if the underlying DataFrame's index is unique and monotonic.

set_data(columns, value)

Set column values to the underlying DataFrame.

copy_dataframe

get_column_names

mutable_dataframe

copy_dataframe()[source]#
copy_ranges(ranges)[source]#

Perform a copy of the current message instance for the given ranges of rows.

Parameters:
rangestyping.List[typing.Tuple[int, int]]

Rows to include in the copy in the form of [(`start_row, stop_row),…]` The stop_row isn’t included. For example to copy rows 1-2 & 5-7 ranges=[(1, 3), (5, 8)]

Returns:
MessageMeta

A new MessageMeta with only the rows specified by ranges.

property count: int#

Returns the number of messages in the batch.

Returns:
int

number of messages in the MessageMeta.df.

property df: pandas.DataFrame | cudf.DataFrame#
ensure_sliceable_index()[source]#

Replaces the index in the underlying dataframe if the existing one is not unique and monotonic. The old index will be preserved in a column named _index_{old_index.name}. If has_sliceable_index() == true, this is a no-op.

Returns:
str

The name of the column with the old index or None if no changes were made

get_column_names()[source]#
get_data(columns=None)[source]#

Return column values from the underlying DataFrame.

Parameters:
columnstyping.Union[None, str, typing.List[str]]

Input column names. Returns all columns if None is specified. When a string is passed, a Series is returned. Otherwise, a Dataframe is returned.

Returns:
Series or Dataframe

Column values from the dataframe.

get_meta_range(mess_offset, message_count, columns=None)[source]#

Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.

Parameters:
mess_offsetint

Offset into the metadata batch.

message_countint

Messages count.

columnstyping.Union[None, str, typing.List[str]]

Input column names. Returns all columns if None is specified. When a string is passed, a Series is returned. Otherwise a Dataframe is returned.

Returns:
Series or Dataframe

Column values from the dataframe.

get_slice(start, stop)[source]#

Returns a new MessageMeta with only the rows specified by start/stop.

Parameters:
startint

Start offset address.

stopint

Stop offset address.

Returns:
MessageMeta

A new MessageMeta with sliced offset and count.

has_sliceable_index()[source]#

Returns True if the underlying DataFrame’s index is unique and monotonic. Sliceable indices have better performance since a range of rows can be specified by a start and stop index instead of requiring boolean masks.

Returns:
bool
mutable_dataframe()[source]#
set_data(columns, value)[source]#

Set column values to the underlying DataFrame.

Parameters:
columnstyping.Union[None, str, typing.List[str]]

Input column names. Sets the value for the corresponding column names. If None is specified, all columns will be used. If the column does not exist, a new one will be created.

valueAny

Value to apply to the specified columns. If a single value is passed, it will be broadcast to all rows. If a Series or Dataframe is passed, rows will be matched by index.

class RawPacketMessage#
Attributes:
gpu_mem
max_size
num
property gpu_mem#
property max_size#
property num#
class ResponseMemory(*args, **kwargs)[source]#

Output memory block holding the results of inference.

Attributes:
tensor_names

Methods

get_output(name)

Get the Tensor stored in the container identified by name.

get_tensor(name)

Get the Tensor stored in the container identified by name.

get_tensors()

Get the tensors contained by this instance.

has_tensor(name)

Returns True if a tensor with the requested name exists in the tensors object

set_output(name, tensor)

Update the output tensor identified by name.

set_tensor(name, tensor)

Update the tensor identified by name.

set_tensors(tensors)

Overwrite the tensors stored by this instance.

count: int#
get_output(name)[source]#

Get the Tensor stored in the container identified by name. Alias for ResponseMemory.get_tensor.

Parameters:
namestr

Key used to do lookup in tensors dict of message container.

Returns:
NDArrayType

Tensors corresponding to name.

Raises:
KeyError

If output name does not exist in message container.

set_output(name, tensor)[source]#

Update the output tensor identified by name. Alias for ResponseMemory.set_tensor

Parameters:
namestr

Key used to do lookup in tensors dict of the container.

tensorNDArrayType

Tensor as either a CuPy or NumPy array.

Raises:
ValueError

If the number of rows in tensor does not match count

tensors: Dict[str, numpy.ndarray | cupy.ndarray]#
class ResponseMemoryAE(*args, **kwargs)[source]#

Subclass of ResponseMemory specific to the AutoEncoder pipeline.

Parameters:
probsNDArrayType

Probabilities tensor

user_idstr

User id the inference was performed against.

explain_dfpd.Dataframe

Explainability Dataframe, for each feature a column will exist with a name in the form of: {feature}_z_loss containing the loss z-score along with max_abs_z and mean_abs_z columns

Attributes:
explain_df
probs
tensor_names

Methods

get_output(name)

Get the Tensor stored in the container identified by name.

get_tensor(name)

Get the Tensor stored in the container identified by name.

get_tensors()

Get the tensors contained by this instance.

has_tensor(name)

Returns True if a tensor with the requested name exists in the tensors object

set_output(name, tensor)

Update the output tensor identified by name.

set_tensor(name, tensor)

Update the tensor identified by name.

set_tensors(tensors)

Overwrite the tensors stored by this instance.

explain_df: pandas.DataFrame = None#
probs: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
user_id: str = ''#
class ResponseMemoryProbs(*args, **kwargs)[source]#

Subclass of ResponseMemory containng an output tensor named ‘probs’.

Parameters:
probsNDArrayType

Probabilities tensor

Attributes:
probs
tensor_names

Methods

get_output(name)

Get the Tensor stored in the container identified by name.

get_tensor(name)

Get the Tensor stored in the container identified by name.

get_tensors()

Get the tensors contained by this instance.

has_tensor(name)

Returns True if a tensor with the requested name exists in the tensors object

set_output(name, tensor)

Update the output tensor identified by name.

set_tensor(name, tensor)

Update the tensor identified by name.

set_tensors(tensors)

Overwrite the tensors stored by this instance.

probs: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
class TensorMemory(**kwargs)[source]#

This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in either CuPy or NumPy arrays.

Parameters:
countint

Length of each tensor contained in tensors.

tensorsTensorMapType

Collection of tensors uniquely identified by a name.

Attributes:
tensor_names

Methods

get_tensor(name)

Get the Tensor stored in the container identified by name.

get_tensors()

Get the tensors contained by this instance.

has_tensor(name)

Returns True if a tensor with the requested name exists in the tensors object

set_tensor(name, tensor)

Update the tensor identified by name.

set_tensors(tensors)

Overwrite the tensors stored by this instance.

count: int#
get_tensor(name)[source]#

Get the Tensor stored in the container identified by name.

Parameters:
namestr

Tensor key name.

Returns:
NDArrayType

Tensor.

Raises:
KeyError

If tensor name does not exist in the container.

get_tensors()[source]#

Get the tensors contained by this instance. It is important to note that when C++ execution is enabled the returned tensors will be a Python copy of the tensors stored in the C++ object. As such any changes made to the tensors will need to be updated with a call to set_tensors.

Returns:
TensorMapType
has_tensor(name)[source]#

Returns True if a tensor with the requested name exists in the tensors object

Parameters:
namestr

Name to lookup

Returns:
bool

True if the tensor was found

set_tensor(name, tensor)[source]#

Update the tensor identified by name.

Parameters:
namestr

Tensor key name.

tensorNDArrayType

Tensor as either a CuPy or NumPy array.

Raises:
ValueError

If the number of rows in tensor does not match count

set_tensors(tensors)[source]#

Overwrite the tensors stored by this instance. If the length of the tensors has changed, then the count property should also be updated.

Parameters:
tensorsTensorMapType

Collection of tensors uniquely identified by a name.

property tensor_names: List[str]#
tensors: Dict[str, numpy.ndarray | cupy.ndarray]#
class UserMessageMeta(**kwargs)[source]#

This class extends MessageMeta to also hold userid corresponding to batched metadata.

Parameters:
dfpandas.DataFrame

Input rows in dataframe.

user_idstr

User id.

Attributes:
count

Returns the number of messages in the batch.

df

Methods

copy_ranges(ranges)

Perform a copy of the current message instance for the given ranges of rows.

ensure_sliceable_index()

Replaces the index in the underlying dataframe if the existing one is not unique and monotonic.

get_data([columns])

Return column values from the underlying DataFrame.

get_meta_range(mess_offset, message_count[, ...])

Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.

get_slice(start, stop)

Returns a new MessageMeta with only the rows specified by start/stop.

has_sliceable_index()

Returns True if the underlying DataFrame's index is unique and monotonic.

set_data(columns, value)

Set column values to the underlying DataFrame.

copy_dataframe

get_column_names

mutable_dataframe

user_id: str#

Modules

control_message

data_class_prop

memory

Memory classes

message_base

message_meta