morpheus.messages
Message classes, which contain data being transfered between pipeline stages
- class ControlMessage(*args, **kwargs)[source]
Methods
get_metadata
([key, default_value])Return a given piece of metadata, if key
isNone
return the entire metadata dictionary.has_task
(task_type)Return True if the control message has at least one task of the given type add_task config copy filter_timestamp get_tasks get_timestamp get_timestamps has_metadata list_metadata payload remove_task set_metadata set_timestamp task_type tensors - add_task(task_type, task)[source]
- config(config=None)[source]
- copy()[source]
- filter_timestamp(regex_filter)[source]
- get_metadata(key=None, default_value=None)[source]
Return a given piece of metadata, if
key
isNone
return the entire metadata dictionary. Ifkey
is not found,default_value
is returned.- Parameters
- Returns
The value of the metadata key, or the entire metadata dictionary if
key
is None- Return type
- get_tasks()[source]
- get_timestamp(key, fail_if_nonexist=False)[source]
- get_timestamps()[source]
- has_metadata(key)[source]
- has_task(task_type)[source]
Return True if the control message has at least one task of the given type
- list_metadata()[source]
- payload(payload=None)[source]
- remove_task(task_type)[source]
- set_metadata(key, value)[source]
- set_timestamp(key, timestamp)[source]
- task_type(new_task_type=None)[source]
- tensors(tensors=None)[source]
- class ControlMessageType
Members:
INFERENCE
NONE
TRAINING
- Attributes
name
name(self: handle) -> str
- value
- INFERENCE = <ControlMessageType.INFERENCE: 1>
- NONE = <ControlMessageType.INFERENCE: 1>
- TRAINING = <ControlMessageType.TRAINING: 2>
- property name
- property value
- class DataLoaderRegistry
Methods
contains
(name)list
()register_loader
(name, loader[, throw_if_exists])unregister_loader
(name[, throw_if_not_exists])- static list() → List[str]
- class InferenceMemory(*, count=None, tensors=None)[source]
This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in either CuPy or NumPy arrays.
- Attributes
- tensor_names
Methods
get_input
(name)Get the tensor stored in the container identified by name
.get_tensor
(name)Get the Tensor stored in the container identified by name
.get_tensors
()Get the tensors contained by this instance. has_tensor
(name)Returns True if a tensor with the requested name exists in the tensors object set_input
(name, tensor)Update the input tensor identified by name
.set_tensor
(name, tensor)Update the tensor identified by name
.set_tensors
(tensors)Overwrite the tensors stored by this instance. - count: int
- get_input(name)[source]
Get the tensor stored in the container identified by
name
. Alias forInferenceMemory.get_tensor
.- Parameters
- namestr
Key used to do lookup in inputs dict of the container.
- Returns
- NDArrayType
Inputs corresponding to name.
- Raises
- KeyError
If input name does not exist in the container.
- set_input(name, tensor)[source]
Update the input tensor identified by
name
. Alias forInferenceMemory.set_tensor
- Parameters
- namestr
Key used to do lookup in inputs dict of the container.
- tensorNDArrayType
Tensor as either CuPy or NumPy array.
- tensors: Dict[str, Union[numpy.ndarray, cupy.ndarray]]
- class InferenceMemoryAE(*, count, inputs, seq_ids)[source]
This is a container class for data that needs to be submitted to the inference server for auto encoder usecases.
- Parameters
- inputsNDArrayType
Inference input.
- seq_idsNDArrayType
Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).
- Attributes
- input
- seq_ids
- tensor_names
Methods
get_input
(name)Get the tensor stored in the container identified by name
.get_tensor
(name)Get the Tensor stored in the container identified by name
.get_tensors
()Get the tensors contained by this instance. has_tensor
(name)Returns True if a tensor with the requested name exists in the tensors object set_input
(name, tensor)Update the input tensor identified by name
.set_tensor
(name, tensor)Update the tensor identified by name
.set_tensors
(tensors)Overwrite the tensors stored by this instance. - input: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- seq_ids: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- class InferenceMemoryFIL(*, count, input__0, seq_ids)[source]
This is a container class for data that needs to be submitted to the inference server for FIL category usecases.
- Parameters
- input__0NDArrayType
Inference input.
- seq_idsNDArrayType
Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).
- Attributes
- input__0
- seq_ids
- tensor_names
Methods
get_input
(name)Get the tensor stored in the container identified by name
.get_tensor
(name)Get the Tensor stored in the container identified by name
.get_tensors
()Get the tensors contained by this instance. has_tensor
(name)Returns True if a tensor with the requested name exists in the tensors object set_input
(name, tensor)Update the input tensor identified by name
.set_tensor
(name, tensor)Update the tensor identified by name
.set_tensors
(tensors)Overwrite the tensors stored by this instance. - input__0: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- seq_ids: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- class InferenceMemoryNLP(*, count, input_ids, input_mask, seq_ids)[source]
This is a container class for data that needs to be submitted to the inference server for NLP category usecases.
- Parameters
- input_idsNDArrayType
The token-ids for each string padded with 0s to max_length.
- input_maskNDArrayType
The mask for token-ids result where corresponding positions identify valid token-id values.
- seq_idsNDArrayType
Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests).
- Attributes
- input_ids
- input_mask
- seq_ids
- tensor_names
Methods
get_input
(name)Get the tensor stored in the container identified by name
.get_tensor
(name)Get the Tensor stored in the container identified by name
.get_tensors
()Get the tensors contained by this instance. has_tensor
(name)Returns True if a tensor with the requested name exists in the tensors object set_input
(name, tensor)Update the input tensor identified by name
.set_tensor
(name, tensor)Update the tensor identified by name
.set_tensors
(tensors)Overwrite the tensors stored by this instance. - input_ids: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- input_mask: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- seq_ids: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- class MessageBase[source]
Base class for all messages. Returns a C++ implementation if
CppConfig.get_should_use_cpp()
isTrue
and the class has an associated C++ implementation (cpp_class
), returns the Python implementation for all others.
- class MessageMeta(df)[source]
This is a container class to hold batch deserialized messages metadata.
- Parameters
- dfpandas.DataFrame
Input rows in dataframe.
- Attributes
count
Returns the number of messages in the batch.
- df
Methods
copy_ranges
(ranges)Perform a copy of the current message instance for the given ranges
of rows.ensure_sliceable_index
()Replaces the index in the underlying dataframe if the existing one is not unique and monotonic. get_data
()Return column values from the underlying DataFrame. get_meta_range
(mess_offset, message_count[, ...])Return column values from morpheus.pipeline.messages.MessageMeta.df
from the specified start offset until the message count.get_slice
(start, stop)Returns a new MessageMeta with only the rows specified by start/stop. has_sliceable_index
()Returns True if the underlying DataFrame's index is unique and monotonic. set_data
(columns, value)Set column values to the underlying DataFrame. copy_dataframe get_column_names mutable_dataframe - copy_dataframe()[source]
- copy_ranges(ranges)[source]
Perform a copy of the current message instance for the given
ranges
of rows.- Parameters
- rangestyping.List[typing.Tuple[int, int]]
Rows to include in the copy in the form of
[(`start_row
,stop_row
),…]` Thestop_row
isn’t included. For example to copy rows 1-2 & 5-7ranges=[(1, 3), (5, 8)]
- Returns
MessageMeta
A new
MessageMeta
with only the rows specified byranges
.
- property count: int
Returns the number of messages in the batch.
- Returns
- int
number of messages in the MessageMeta.df.
- property df: Union[pandas.DataFrame, cudf.DataFrame]
- ensure_sliceable_index()[source]
Replaces the index in the underlying dataframe if the existing one is not unique and monotonic. The old index will be preserved in a column named
_index_{old_index.name}
. Ifhas_sliceable_index() == true
, this is a no-op.- Returns
- str
The name of the column with the old index or
None
if no changes were made
- get_column_names()[source]
- get_data() → DataFrameType[source]
- get_data(columns: str) → SeriesType
- get_data(columns: List[str]) → DataFrameType
Return column values from the underlying DataFrame.
- Parameters
- columnstyping.Union[None, str, typing.List[str]]
Input column names. Returns all columns if
None
is specified. When a string is passed, aSeries
is returned. Otherwise, aDataframe
is returned.
- Returns
- Series or Dataframe
Column values from the dataframe.
- get_meta_range(mess_offset, message_count, columns=None)[source]
Return column values from
morpheus.pipeline.messages.MessageMeta.df
from the specified start offset until the message count.- Parameters
- mess_offsetint
Offset into the metadata batch.
- message_countint
Messages count.
- columnstyping.Union[None, str, typing.List[str]]
Input column names. Returns all columns if
None
is specified. When a string is passed, aSeries
is returned. Otherwise aDataframe
is returned.
- Returns
- Series or Dataframe
Column values from the dataframe.
- get_slice(start, stop)[source]
Returns a new MessageMeta with only the rows specified by start/stop.
- Parameters
- startint
Start offset address.
- stopint
Stop offset address.
- Returns
MessageMeta
A new
MessageMeta
with sliced offset and count.
- has_sliceable_index()[source]
Returns True if the underlying DataFrame’s index is unique and monotonic. Sliceable indices have better performance since a range of rows can be specified by a start and stop index instead of requiring boolean masks.
- Returns
- bool
- mutable_dataframe()[source]
- set_data(columns, value)[source]
Set column values to the underlying DataFrame.
- Parameters
- columnstyping.Union[None, str, typing.List[str]]
Input column names. Sets the value for the corresponding column names. If
None
is specified, all columns will be used. If the column does not exist, a new one will be created.- valueAny
Value to apply to the specified columns. If a single value is passed, it will be broadcast to all rows. If a
Series
orDataframe
is passed, rows will be matched by index.
- class RawPacketMessage
- Attributes
- gpu_mem
- max_size
- num
- property gpu_mem
- property max_size
- property num
- class ResponseMemory(*args, **kwargs)[source]
Output memory block holding the results of inference.
- Attributes
- tensor_names
Methods
get_output
(name)Get the Tensor stored in the container identified by name
.get_tensor
(name)Get the Tensor stored in the container identified by name
.get_tensors
()Get the tensors contained by this instance. has_tensor
(name)Returns True if a tensor with the requested name exists in the tensors object set_output
(name, tensor)Update the output tensor identified by name
.set_tensor
(name, tensor)Update the tensor identified by name
.set_tensors
(tensors)Overwrite the tensors stored by this instance. - count: int
- get_output(name)[source]
Get the Tensor stored in the container identified by
name
. Alias forResponseMemory.get_tensor
.- Parameters
- namestr
Key used to do lookup in tensors dict of message container.
- Returns
- NDArrayType
Tensors corresponding to name.
- Raises
- KeyError
If output name does not exist in message container.
- set_output(name, tensor)[source]
Update the output tensor identified by
name
. Alias forResponseMemory.set_tensor
- Parameters
- namestr
Key used to do lookup in tensors dict of the container.
- tensorNDArrayType
Tensor as either a CuPy or NumPy array.
- Raises
- ValueError
If the number of rows in
tensor
does not matchcount
- tensors: Dict[str, Union[numpy.ndarray, cupy.ndarray]]
- class ResponseMemoryAE(*args, **kwargs)[source]
Subclass of
ResponseMemory
specific to the AutoEncoder pipeline.- Parameters
- probsNDArrayType
Probabilities tensor
- user_idstr
User id the inference was performed against.
- explain_dfpd.Dataframe
Explainability Dataframe, for each feature a column will exist with a name in the form of:
{feature}_z_loss
containing the loss z-score along withmax_abs_z
andmean_abs_z
columns
- Attributes
- explain_df
- probs
- tensor_names
Methods
get_output
(name)Get the Tensor stored in the container identified by name
.get_tensor
(name)Get the Tensor stored in the container identified by name
.get_tensors
()Get the tensors contained by this instance. has_tensor
(name)Returns True if a tensor with the requested name exists in the tensors object set_output
(name, tensor)Update the output tensor identified by name
.set_tensor
(name, tensor)Update the tensor identified by name
.set_tensors
(tensors)Overwrite the tensors stored by this instance. - explain_df: pandas.DataFrame = None
- probs: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- user_id: str = ''
- class ResponseMemoryProbs(*args, **kwargs)[source]
Subclass of
ResponseMemory
containng an output tensor named ‘probs’.- Parameters
- probsNDArrayType
Probabilities tensor
- Attributes
- probs
- tensor_names
Methods
get_output
(name)Get the Tensor stored in the container identified by name
.get_tensor
(name)Get the Tensor stored in the container identified by name
.get_tensors
()Get the tensors contained by this instance. has_tensor
(name)Returns True if a tensor with the requested name exists in the tensors object set_output
(name, tensor)Update the output tensor identified by name
.set_tensor
(name, tensor)Update the tensor identified by name
.set_tensors
(tensors)Overwrite the tensors stored by this instance. - probs: dataclasses.InitVar[typing.Union[ForwardRef('numpy.ndarray'), ForwardRef('cupy.ndarray')]] = None
- class TensorMemory(*, count=None, tensors=None)[source]
This is a base container class for data that will be used for inference stages. This class is designed to hold generic tensor data in either CuPy or NumPy arrays.
- Parameters
- countint
Length of each tensor contained in
tensors
.- tensorsTensorMapType
Collection of tensors uniquely identified by a name.
- Attributes
- tensor_names
Methods
get_tensor
(name)Get the Tensor stored in the container identified by name
.get_tensors
()Get the tensors contained by this instance. has_tensor
(name)Returns True if a tensor with the requested name exists in the tensors object set_tensor
(name, tensor)Update the tensor identified by name
.set_tensors
(tensors)Overwrite the tensors stored by this instance. - count: int
- get_tensor(name)[source]
Get the Tensor stored in the container identified by
name
.- Parameters
- namestr
Tensor key name.
- Returns
- NDArrayType
Tensor.
- Raises
- KeyError
If tensor name does not exist in the container.
- get_tensors()[source]
Get the tensors contained by this instance. It is important to note that when C++ execution is enabled the returned tensors will be a Python copy of the tensors stored in the C++ object. As such any changes made to the tensors will need to be updated with a call to
set_tensors
.- Returns
- TensorMapType
- has_tensor(name)[source]
Returns True if a tensor with the requested name exists in the tensors object
- Parameters
- namestr
Name to lookup
- Returns
- bool
True if the tensor was found
- set_tensor(name, tensor)[source]
Update the tensor identified by
name
.- Parameters
- namestr
Tensor key name.
- tensorNDArrayType
Tensor as either a CuPy or NumPy array.
- Raises
- ValueError
If the number of rows in
tensor
does not matchcount
- set_tensors(tensors)[source]
Overwrite the tensors stored by this instance. If the length of the tensors has changed, then the
count
property should also be updated.- Parameters
- tensorsTensorMapType
Collection of tensors uniquely identified by a name.
- property tensor_names: List[str]
- tensors: Dict[str, Union[numpy.ndarray, cupy.ndarray]]
- class UserMessageMeta(df, user_id)[source]
This class extends MessageMeta to also hold userid corresponding to batched metadata.
- Parameters
- dfpandas.DataFrame
Input rows in dataframe.
- user_idstr
User id.
- Attributes
count
Returns the number of messages in the batch.
- df
Methods
copy_ranges
(ranges)Perform a copy of the current message instance for the given ranges
of rows.ensure_sliceable_index
()Replaces the index in the underlying dataframe if the existing one is not unique and monotonic. get_data
([columns])Return column values from the underlying DataFrame. get_meta_range
(mess_offset, message_count[, ...])Return column values from morpheus.pipeline.messages.MessageMeta.df
from the specified start offset until the message count.get_slice
(start, stop)Returns a new MessageMeta with only the rows specified by start/stop. has_sliceable_index
()Returns True if the underlying DataFrame's index is unique and monotonic. set_data
(columns, value)Set column values to the underlying DataFrame. copy_dataframe get_column_names mutable_dataframe - user_id: str
Modules
morpheus.messages.control_message |
|
morpheus.messages.data_class_prop |
|
morpheus.messages.memory |
Memory classes |
morpheus.messages.message_base |
|
morpheus.messages.message_meta |