morpheus.messages#
Message classes, which contain data being transfered between pipeline stages
- class ControlMessage(*args, **kwargs)[source]#
Methods
get_metadata([key, default_value])Return a given piece of metadata, if
keyisNonereturn the entire metadata dictionary.has_task(task_type)Return True if the control message has at least one task of the given type
add_task
config
copy
filter_timestamp
get_tasks
get_timestamp
get_timestamps
has_metadata
list_metadata
payload
remove_task
set_metadata
set_timestamp
task_type
tensor_count
tensors
- get_metadata(key=None, default_value=None)[source]#
Return a given piece of metadata, if
keyisNonereturn the entire metadata dictionary. Ifkeyis not found,default_valueis returned.
- class ControlMessageType#
Members:
INFERENCE
NONE
TRAINING
- Attributes:
namename(self: object) -> str
- value
- INFERENCE = <ControlMessageType.INFERENCE: 1>#
- NONE = <ControlMessageType.INFERENCE: 1>#
- TRAINING = <ControlMessageType.TRAINING: 2>#
- property name#
- property value#
- class DataLoaderRegistry#
Methods
contains(name)list()register_loader(name, loader[, throw_if_exists])unregister_loader(name[, throw_if_not_exists])
- class InferenceMemory(**kwargs)[source]#
This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in either CuPy or NumPy arrays.
- Attributes:
- tensor_names
Methods
get_input(name)Get the tensor stored in the container identified by
name.get_tensor(name)Get the Tensor stored in the container identified by
name.get_tensors()Get the tensors contained by this instance.
has_tensor(name)Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor)Update the input tensor identified by
name.set_tensor(name, tensor)Update the tensor identified by
name.set_tensors(tensors)Overwrite the tensors stored by this instance.
- get_input(name)[source]#
Get the tensor stored in the container identified by
name. Alias forInferenceMemory.get_tensor.- Parameters:
- namestr
Key used to do lookup in inputs dict of the container.
- Returns:
- NDArrayType
Inputs corresponding to name.
- Raises:
- KeyError
If input name does not exist in the container.
- class InferenceMemoryAE(**kwargs)[source]#
This is a container class for data that needs to be submitted to the inference server for auto encoder usecases.
- Parameters:
- inputsNDArrayType
Inference input.
- seq_idsNDArrayType
Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).
- Attributes:
- input
- seq_ids
- tensor_names
Methods
get_input(name)Get the tensor stored in the container identified by
name.get_tensor(name)Get the Tensor stored in the container identified by
name.get_tensors()Get the tensors contained by this instance.
has_tensor(name)Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor)Update the input tensor identified by
name.set_tensor(name, tensor)Update the tensor identified by
name.set_tensors(tensors)Overwrite the tensors stored by this instance.
- input: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- seq_ids: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- class InferenceMemoryFIL(**kwargs)[source]#
This is a container class for data that needs to be submitted to the inference server for FIL category usecases.
- Parameters:
- input__0NDArrayType
Inference input.
- seq_idsNDArrayType
Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).
- Attributes:
- input__0
- seq_ids
- tensor_names
Methods
get_input(name)Get the tensor stored in the container identified by
name.get_tensor(name)Get the Tensor stored in the container identified by
name.get_tensors()Get the tensors contained by this instance.
has_tensor(name)Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor)Update the input tensor identified by
name.set_tensor(name, tensor)Update the tensor identified by
name.set_tensors(tensors)Overwrite the tensors stored by this instance.
- input__0: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- seq_ids: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- class InferenceMemoryNLP(**kwargs)[source]#
This is a container class for data that needs to be submitted to the inference server for NLP category usecases.
- Parameters:
- input_idsNDArrayType
The token-ids for each string padded with 0s to max_length.
- input_maskNDArrayType
The mask for token-ids result where corresponding positions identify valid token-id values.
- seq_idsNDArrayType
Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).
- Attributes:
- input_ids
- input_mask
- seq_ids
- tensor_names
Methods
get_input(name)Get the tensor stored in the container identified by
name.get_tensor(name)Get the Tensor stored in the container identified by
name.get_tensors()Get the tensors contained by this instance.
has_tensor(name)Returns True if a tensor with the requested name exists in the tensors object
set_input(name, tensor)Update the input tensor identified by
name.set_tensor(name, tensor)Update the tensor identified by
name.set_tensors(tensors)Overwrite the tensors stored by this instance.
- input_ids: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- input_mask: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- seq_ids: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- class MessageBase[source]#
Base class for all messages. Returns a C++ implementation if
CppConfig.get_should_use_cpp()isTrueand the class has an associated C++ implementation (cpp_class), returns the Python implementation for all others.
- class MessageMeta(**kwargs)[source]#
This is a container class to hold batch deserialized messages metadata.
- Parameters:
- dfpandas.DataFrame
Input rows in dataframe.
- Attributes:
countReturns the number of messages in the batch.
- df
Methods
copy_ranges(ranges)Perform a copy of the current message instance for the given
rangesof rows.Replaces the index in the underlying dataframe if the existing one is not unique and monotonic.
get_data()Return column values from the underlying DataFrame.
get_meta_range(mess_offset, message_count[, ...])Return column values from
morpheus.pipeline.messages.MessageMeta.dffrom the specified start offset until the message count.get_slice(start, stop)Returns a new MessageMeta with only the rows specified by start/stop.
Returns True if the underlying DataFrame's index is unique and monotonic.
set_data(columns, value)Set column values to the underlying DataFrame.
copy_dataframe
get_column_names
mutable_dataframe
- copy_ranges(ranges)[source]#
Perform a copy of the current message instance for the given
rangesof rows.- Parameters:
- rangestyping.List[typing.Tuple[int, int]]
Rows to include in the copy in the form of
[(`start_row,stop_row),…]` Thestop_rowisn’t included. For example to copy rows 1-2 & 5-7ranges=[(1, 3), (5, 8)]
- Returns:
MessageMetaA new
MessageMetawith only the rows specified byranges.
- property count: int#
Returns the number of messages in the batch.
- Returns:
- int
number of messages in the MessageMeta.df.
- property df: pandas.DataFrame | cudf.DataFrame#
- ensure_sliceable_index()[source]#
Replaces the index in the underlying dataframe if the existing one is not unique and monotonic. The old index will be preserved in a column named
_index_{old_index.name}. Ifhas_sliceable_index() == true, this is a no-op.- Returns:
- str
The name of the column with the old index or
Noneif no changes were made
- get_data(columns=None)[source]#
Return column values from the underlying DataFrame.
- Parameters:
- columnstyping.Union[None, str, typing.List[str]]
Input column names. Returns all columns if
Noneis specified. When a string is passed, aSeriesis returned. Otherwise, aDataframeis returned.
- Returns:
- Series or Dataframe
Column values from the dataframe.
- get_meta_range(mess_offset, message_count, columns=None)[source]#
Return column values from
morpheus.pipeline.messages.MessageMeta.dffrom the specified start offset until the message count.- Parameters:
- mess_offsetint
Offset into the metadata batch.
- message_countint
Messages count.
- columnstyping.Union[None, str, typing.List[str]]
Input column names. Returns all columns if
Noneis specified. When a string is passed, aSeriesis returned. Otherwise aDataframeis returned.
- Returns:
- Series or Dataframe
Column values from the dataframe.
- get_slice(start, stop)[source]#
Returns a new MessageMeta with only the rows specified by start/stop.
- Parameters:
- startint
Start offset address.
- stopint
Stop offset address.
- Returns:
MessageMetaA new
MessageMetawith sliced offset and count.
- has_sliceable_index()[source]#
Returns True if the underlying DataFrame’s index is unique and monotonic. Sliceable indices have better performance since a range of rows can be specified by a start and stop index instead of requiring boolean masks.
- Returns:
- bool
- set_data(columns, value)[source]#
Set column values to the underlying DataFrame.
- Parameters:
- columnstyping.Union[None, str, typing.List[str]]
Input column names. Sets the value for the corresponding column names. If
Noneis specified, all columns will be used. If the column does not exist, a new one will be created.- valueAny
Value to apply to the specified columns. If a single value is passed, it will be broadcast to all rows. If a
SeriesorDataframeis passed, rows will be matched by index.
- class RawPacketMessage#
- Attributes:
- gpu_mem
- max_size
- num
- property gpu_mem#
- property max_size#
- property num#
- class ResponseMemory(*args, **kwargs)[source]#
Output memory block holding the results of inference.
- Attributes:
- tensor_names
Methods
get_output(name)Get the Tensor stored in the container identified by
name.get_tensor(name)Get the Tensor stored in the container identified by
name.get_tensors()Get the tensors contained by this instance.
has_tensor(name)Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor)Update the output tensor identified by
name.set_tensor(name, tensor)Update the tensor identified by
name.set_tensors(tensors)Overwrite the tensors stored by this instance.
- get_output(name)[source]#
Get the Tensor stored in the container identified by
name. Alias forResponseMemory.get_tensor.- Parameters:
- namestr
Key used to do lookup in tensors dict of message container.
- Returns:
- NDArrayType
Tensors corresponding to name.
- Raises:
- KeyError
If output name does not exist in message container.
- set_output(name, tensor)[source]#
Update the output tensor identified by
name. Alias forResponseMemory.set_tensor- Parameters:
- namestr
Key used to do lookup in tensors dict of the container.
- tensorNDArrayType
Tensor as either a CuPy or NumPy array.
- Raises:
- ValueError
If the number of rows in
tensordoes not matchcount
- class ResponseMemoryAE(*args, **kwargs)[source]#
Subclass of
ResponseMemoryspecific to the AutoEncoder pipeline.- Parameters:
- probsNDArrayType
Probabilities tensor
- user_idstr
User id the inference was performed against.
- explain_dfpd.Dataframe
Explainability Dataframe, for each feature a column will exist with a name in the form of:
{feature}_z_losscontaining the loss z-score along withmax_abs_zandmean_abs_zcolumns
- Attributes:
- explain_df
- probs
- tensor_names
Methods
get_output(name)Get the Tensor stored in the container identified by
name.get_tensor(name)Get the Tensor stored in the container identified by
name.get_tensors()Get the tensors contained by this instance.
has_tensor(name)Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor)Update the output tensor identified by
name.set_tensor(name, tensor)Update the tensor identified by
name.set_tensors(tensors)Overwrite the tensors stored by this instance.
- explain_df: pandas.DataFrame = None#
- probs: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- class ResponseMemoryProbs(*args, **kwargs)[source]#
Subclass of
ResponseMemorycontainng an output tensor named ‘probs’.- Parameters:
- probsNDArrayType
Probabilities tensor
- Attributes:
- probs
- tensor_names
Methods
get_output(name)Get the Tensor stored in the container identified by
name.get_tensor(name)Get the Tensor stored in the container identified by
name.get_tensors()Get the tensors contained by this instance.
has_tensor(name)Returns True if a tensor with the requested name exists in the tensors object
set_output(name, tensor)Update the output tensor identified by
name.set_tensor(name, tensor)Update the tensor identified by
name.set_tensors(tensors)Overwrite the tensors stored by this instance.
- probs: dataclasses.InitVar[Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None#
- class TensorMemory(**kwargs)[source]#
This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in either CuPy or NumPy arrays.
- Parameters:
- countint
Length of each tensor contained in
tensors.- tensorsTensorMapType
Collection of tensors uniquely identified by a name.
- Attributes:
- tensor_names
Methods
get_tensor(name)Get the Tensor stored in the container identified by
name.Get the tensors contained by this instance.
has_tensor(name)Returns True if a tensor with the requested name exists in the tensors object
set_tensor(name, tensor)Update the tensor identified by
name.set_tensors(tensors)Overwrite the tensors stored by this instance.
- get_tensor(name)[source]#
Get the Tensor stored in the container identified by
name.- Parameters:
- namestr
Tensor key name.
- Returns:
- NDArrayType
Tensor.
- Raises:
- KeyError
If tensor name does not exist in the container.
- get_tensors()[source]#
Get the tensors contained by this instance. It is important to note that when C++ execution is enabled the returned tensors will be a Python copy of the tensors stored in the C++ object. As such any changes made to the tensors will need to be updated with a call to
set_tensors.- Returns:
- TensorMapType
- has_tensor(name)[source]#
Returns True if a tensor with the requested name exists in the tensors object
- Parameters:
- namestr
Name to lookup
- Returns:
- bool
True if the tensor was found
- set_tensor(name, tensor)[source]#
Update the tensor identified by
name.- Parameters:
- namestr
Tensor key name.
- tensorNDArrayType
Tensor as either a CuPy or NumPy array.
- Raises:
- ValueError
If the number of rows in
tensordoes not matchcount
- class UserMessageMeta(**kwargs)[source]#
This class extends MessageMeta to also hold userid corresponding to batched metadata.
- Parameters:
- dfpandas.DataFrame
Input rows in dataframe.
- user_idstr
User id.
- Attributes:
countReturns the number of messages in the batch.
- df
Methods
copy_ranges(ranges)Perform a copy of the current message instance for the given
rangesof rows.ensure_sliceable_index()Replaces the index in the underlying dataframe if the existing one is not unique and monotonic.
get_data([columns])Return column values from the underlying DataFrame.
get_meta_range(mess_offset, message_count[, ...])Return column values from
morpheus.pipeline.messages.MessageMeta.dffrom the specified start offset until the message count.get_slice(start, stop)Returns a new MessageMeta with only the rows specified by start/stop.
has_sliceable_index()Returns True if the underlying DataFrame's index is unique and monotonic.
set_data(columns, value)Set column values to the underlying DataFrame.
copy_dataframe
get_column_names
mutable_dataframe
Modules
|
|
|
|
|
Memory classes |
|
|
|