NVIDIA Morpheus (24.10.01)
(Latest Version)

morpheus.messages

Message classes, which contain data being transfered between pipeline stages

class ControlMessage(*args, **kwargs)[source]

Methods

get_metadata([key, default_value]) Return a given piece of metadata, if key is None return the entire metadata dictionary.
has_task(task_type) Return True if the control message has at least one task of the given type
add_task
config
copy
filter_timestamp
get_tasks
get_timestamp
get_timestamps
has_metadata
list_metadata
payload
remove_task
set_metadata
set_timestamp
task_type
tensors
add_task(task_type, task)[source]
config(config=None)[source]
copy()[source]
filter_timestamp(regex_filter)[source]
get_metadata(key=None, default_value=None)[source]

Return a given piece of metadata, if key is None return the entire metadata dictionary. If key is not found, default_value is returned.

Parameters
  • key (Optional[str]) – The key of the metadata to retrieve, or None for all metadata

  • default_value (Optional[Any]) – The value to return if the key is not found, ignored if key is None

Returns

The value of the metadata key, or the entire metadata dictionary if key is None

Return type

Any

get_tasks()[source]
get_timestamp(key, fail_if_nonexist=False)[source]
get_timestamps()[source]
has_metadata(key)[source]
has_task(task_type)[source]

Return True if the control message has at least one task of the given type

list_metadata()[source]
payload(payload=None)[source]
remove_task(task_type)[source]
set_metadata(key, value)[source]
set_timestamp(key, timestamp)[source]
task_type(new_task_type=None)[source]
tensors(tensors=None)[source]
class ControlMessageType

Members:

INFERENCE

NONE

TRAINING

Attributes
name

name(self: handle) -> str

value
INFERENCE = <ControlMessageType.INFERENCE: 1>
NONE = <ControlMessageType.INFERENCE: 1>
TRAINING = <ControlMessageType.TRAINING: 2>
property name
property value
class DataLoaderRegistry

Methods

contains(name)

list()

register_loader(name, loader[, throw_if_exists])

unregister_loader(name[, throw_if_not_exists])

static contains(name: str) → bool
static list() → List[str]
static register_loader(name: str, loader: Callable[[morpheus._lib.messages.ControlMessage, dict], morpheus._lib.messages.ControlMessage], throw_if_exists: bool = True) → None
static unregister_loader(name: str, throw_if_not_exists: bool = True) → None
class InferenceMemory(*, count=None, tensors=None)[source]

This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in either CuPy or NumPy arrays.

Attributes
tensor_names

Methods

get_input(name) Get the tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor) Update the input tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
count: int
get_input(name)[source]

Get the tensor stored in the container identified by name. Alias for InferenceMemory.get_tensor.

Parameters
namestr

Key used to do lookup in inputs dict of the container.

Returns
NDArrayType

Inputs corresponding to name.

Raises
KeyError

If input name does not exist in the container.

set_input(name, tensor)[source]

Update the input tensor identified by name. Alias for InferenceMemory.set_tensor

Parameters
namestr

Key used to do lookup in inputs dict of the container.

tensorNDArrayType

Tensor as either CuPy or NumPy array.

tensors: Dict[str, Union[numpy.ndarray, cupy.ndarray]]
class InferenceMemoryAE(*, count, inputs, seq_ids)[source]

This is a container class for data that needs to be submitted to the inference server for auto encoder usecases.

Parameters
inputsNDArrayType

Inference input.

seq_idsNDArrayType

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes
input
seq_ids
tensor_names

Methods

get_input(name) Get the tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor) Update the input tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
input: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
seq_ids: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
class InferenceMemoryFIL(*, count, input__0, seq_ids)[source]

This is a container class for data that needs to be submitted to the inference server for FIL category usecases.

Parameters
input__0NDArrayType

Inference input.

seq_idsNDArrayType

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes
input__0
seq_ids
tensor_names

Methods

get_input(name) Get the tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor) Update the input tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
input__0: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
seq_ids: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
class InferenceMemoryNLP(*, count, input_ids, input_mask, seq_ids)[source]

This is a container class for data that needs to be submitted to the inference server for NLP category usecases.

Parameters
input_idsNDArrayType

The token-ids for each string padded with 0s to max_length.

input_maskNDArrayType

The mask for token-ids result where corresponding positions identify valid token-id values.

seq_idsNDArrayType

Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).

Attributes
input_ids
input_mask
seq_ids
tensor_names

Methods

get_input(name) Get the tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor) Update the input tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
input_ids: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
input_mask: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
seq_ids: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
class MessageBase[source]

Base class for all messages. Returns a C++ implementation if CppConfig.get_should_use_cpp() is True and the class has an associated C++ implementation (cpp_class), returns the Python implementation for all others.

class MessageMeta(df)[source]

This is a container class to hold batch deserialized messages metadata.

Parameters
dfpandas.DataFrame

Input rows in dataframe.

Attributes
count

Returns the number of messages in the batch.

df

Methods

copy_ranges(ranges) Perform a copy of the current message instance for the given ranges of rows.
ensure_sliceable_index() Replaces the index in the underlying dataframe if the existing one is not unique and monotonic.
get_data() Return column values from the underlying DataFrame.
get_meta_range(mess_offset, message_count[, ...]) Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.
get_slice(start, stop) Returns a new MessageMeta with only the rows specified by start/stop.
has_sliceable_index() Returns True if the underlying DataFrame's index is unique and monotonic.
set_data(columns, value) Set column values to the underlying DataFrame.
copy_dataframe
get_column_names
mutable_dataframe
copy_dataframe()[source]
copy_ranges(ranges)[source]

Perform a copy of the current message instance for the given ranges of rows.

Parameters
rangestyping.List[typing.Tuple[int, int]]

Rows to include in the copy in the form of [(`start_row, stop_row),…]` The stop_row isn’t included. For example to copy rows 1-2 & 5-7 ranges=[(1, 3), (5, 8)]

Returns
MessageMeta

A new MessageMeta with only the rows specified by ranges.

property count: int

Returns the number of messages in the batch.

Returns
int

number of messages in the MessageMeta.df.

property df: Union[pandas.DataFrame, cudf.DataFrame]
ensure_sliceable_index()[source]

Replaces the index in the underlying dataframe if the existing one is not unique and monotonic. The old index will be preserved in a column named _index_{old_index.name}. If has_sliceable_index() == true, this is a no-op.

Returns
str

The name of the column with the old index or None if no changes were made

get_column_names()[source]
get_data() → DataFrameType[source]
get_data(columns: str) → SeriesType
get_data(columns: List[str]) → DataFrameType

Return column values from the underlying DataFrame.

Parameters
columnstyping.Union[None, str, typing.List[str]]

Input column names. Returns all columns if None is specified. When a string is passed, a Series is returned. Otherwise, a Dataframe is returned.

Returns
Series or Dataframe

Column values from the dataframe.

get_meta_range(mess_offset, message_count, columns=None)[source]

Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.

Parameters
mess_offsetint

Offset into the metadata batch.

message_countint

Messages count.

columnstyping.Union[None, str, typing.List[str]]

Input column names. Returns all columns if None is specified. When a string is passed, a Series is returned. Otherwise a Dataframe is returned.

Returns
Series or Dataframe

Column values from the dataframe.

get_slice(start, stop)[source]

Returns a new MessageMeta with only the rows specified by start/stop.

Parameters
startint

Start offset address.

stopint

Stop offset address.

Returns
MessageMeta

A new MessageMeta with sliced offset and count.

has_sliceable_index()[source]

Returns True if the underlying DataFrame’s index is unique and monotonic. Sliceable indices have better performance since a range of rows can be specified by a start and stop index instead of requiring boolean masks.

Returns
bool
mutable_dataframe()[source]
set_data(columns, value)[source]

Set column values to the underlying DataFrame.

Parameters
columnstyping.Union[None, str, typing.List[str]]

Input column names. Sets the value for the corresponding column names. If None is specified, all columns will be used. If the column does not exist, a new one will be created.

valueAny

Value to apply to the specified columns. If a single value is passed, it will be broadcast to all rows. If a Series or Dataframe is passed, rows will be matched by index.

class RawPacketMessage
Attributes
gpu_mem
max_size
num
property gpu_mem
property max_size
property num
class ResponseMemory(*args, **kwargs)[source]

Output memory block holding the results of inference.

Attributes
tensor_names

Methods

get_output(name) Get the Tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor) Update the output tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
count: int
get_output(name)[source]

Get the Tensor stored in the container identified by name. Alias for ResponseMemory.get_tensor.

Parameters
namestr

Key used to do lookup in tensors dict of message container.

Returns
NDArrayType

Tensors corresponding to name.

Raises
KeyError

If output name does not exist in message container.

set_output(name, tensor)[source]

Update the output tensor identified by name. Alias for ResponseMemory.set_tensor

Parameters
namestr

Key used to do lookup in tensors dict of the container.

tensorNDArrayType

Tensor as either a CuPy or NumPy array.

Raises
ValueError

If the number of rows in tensor does not match count

tensors: Dict[str, Union[numpy.ndarray, cupy.ndarray]]
class ResponseMemoryAE(*args, **kwargs)[source]

Subclass of ResponseMemory specific to the AutoEncoder pipeline.

Parameters
probsNDArrayType

Probabilities tensor

user_idstr

User id the inference was performed against.

explain_dfpd.Dataframe

Explainability Dataframe, for each feature a column will exist with a name in the form of: {feature}_z_loss containing the loss z-score along with max_abs_z and mean_abs_z columns

Attributes
explain_df
probs
tensor_names

Methods

get_output(name) Get the Tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor) Update the output tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
explain_df: pandas.DataFrame = None
probs: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
user_id: str = ''
class ResponseMemoryProbs(*args, **kwargs)[source]

Subclass of ResponseMemory containng an output tensor named ‘probs’.

Parameters
probsNDArrayType

Probabilities tensor

Attributes
probs
tensor_names

Methods

get_output(name) Get the Tensor stored in the container identified by name.
get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor) Update the output tensor identified by name.
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
probs: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
class TensorMemory(*, count=None, tensors=None)[source]

This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in either CuPy or NumPy arrays.

Parameters
countint

Length of each tensor contained in tensors.

tensorsTensorMapType

Collection of tensors uniquely identified by a name.

Attributes
tensor_names

Methods

get_tensor(name) Get the Tensor stored in the container identified by name.
get_tensors() Get the tensors contained by this instance.
has_tensor(name) Returns True if a tensor with the requested name exists in the tensors object
set_tensor(name, tensor) Update the tensor identified by name.
set_tensors(tensors) Overwrite the tensors stored by this instance.
count: int
get_tensor(name)[source]

Get the Tensor stored in the container identified by name.

Parameters
namestr

Tensor key name.

Returns
NDArrayType

Tensor.

Raises
KeyError

If tensor name does not exist in the container.

get_tensors()[source]

Get the tensors contained by this instance. It is important to note that when C++ execution is enabled the returned tensors will be a Python copy of the tensors stored in the C++ object. As such any changes made to the tensors will need to be updated with a call to set_tensors.

Returns
TensorMapType
has_tensor(name)[source]

Returns True if a tensor with the requested name exists in the tensors object

Parameters
namestr

Name to lookup

Returns
bool

True if the tensor was found

set_tensor(name, tensor)[source]

Update the tensor identified by name.

Parameters
namestr

Tensor key name.

tensorNDArrayType

Tensor as either a CuPy or NumPy array.

Raises
ValueError

If the number of rows in tensor does not match count

set_tensors(tensors)[source]

Overwrite the tensors stored by this instance. If the length of the tensors has changed, then the count property should also be updated.

Parameters
tensorsTensorMapType

Collection of tensors uniquely identified by a name.

property tensor_names: List[str]
tensors: Dict[str, Union[numpy.ndarray, cupy.ndarray]]
class UserMessageMeta(df, user_id)[source]

This class extends MessageMeta to also hold userid corresponding to batched metadata.

Parameters
dfpandas.DataFrame

Input rows in dataframe.

user_idstr

User id.

Attributes
count

Returns the number of messages in the batch.

df

Methods

copy_ranges(ranges) Perform a copy of the current message instance for the given ranges of rows.
ensure_sliceable_index() Replaces the index in the underlying dataframe if the existing one is not unique and monotonic.
get_data([columns]) Return column values from the underlying DataFrame.
get_meta_range(mess_offset, message_count[, ...]) Return column values from morpheus.pipeline.messages.MessageMeta.df from the specified start offset until the message count.
get_slice(start, stop) Returns a new MessageMeta with only the rows specified by start/stop.
has_sliceable_index() Returns True if the underlying DataFrame's index is unique and monotonic.
set_data(columns, value) Set column values to the underlying DataFrame.
copy_dataframe
get_column_names
mutable_dataframe
user_id: str

Modules

morpheus.messages.control_message

morpheus.messages.data_class_prop

morpheus.messages.memory Memory classes
morpheus.messages.message_base

morpheus.messages.message_meta

Previous morpheus.loaders.sql_loader
Next morpheus.messages.control_message
© Copyright 2024, NVIDIA. Last updated on Dec 3, 2024.