nv_ingest.modules.sinks package#

Submodules#

nv_ingest.modules.sinks.message_broker_task_sink module#

nv_ingest.modules.sinks.message_broker_task_sink.create_json_payload(
message: IngestControlMessage,
df_json: Dict[str, Any],
) List[Dict[str, Any]][source]#

Creates JSON payloads based on message status and data. If the size of df_json exceeds 256 MB, splits it into multiple fragments, each less than 256 MB. Adds optional trace and annotation data to the first fragment.

nv_ingest.modules.sinks.message_broker_task_sink.extract_data_frame(
message: IngestControlMessage,
) Tuple[Any, Dict[str, Any]][source]#

Extracts a DataFrame from a message payload and returns it along with a filtered dictionary of required columns.

Parameters:

message (IngestControlMessage) – The message object containing the payload.

Returns:

A tuple containing the DataFrame and a dictionary of selected columns.

Return type:

Tuple[Any, Dict[str, Any]]

nv_ingest.modules.sinks.message_broker_task_sink.handle_failure(
broker_client: MessageBrokerClientBase,
response_channel: str,
json_result_fragments: List[Dict[str, Any]],
e: Exception,
mdf_size: int,
) None[source]#

Handles failure scenarios by logging the error and pushing a failure message to a broker channel.

Parameters:
  • broker_client (Any) – A MessageBrokerClientBase instance.

  • response_channel (str) – The broker channel to which the failure message will be sent.

  • json_result_fragments (List[Dict[str, Any]]) – A list of JSON result fragments, where each fragment is a dictionary containing the results of the operation. The first fragment is used to extract trace data in the failure message.

  • e (Exception) – The exception object that triggered the failure.

  • mdf_size (int) – The number of rows in the message data frame (mdf) being processed.

Returns:

This function does not return any value. It handles the failure by logging the error and sending a message to the message broker.

Return type:

None

Notes

The failure message includes the error description, the size of the first JSON result fragment in MB, and the number of rows in the data being processed. If trace information is available in the first fragment of json_result_fragments, it is included in the failure message.

Examples

>>> broker_client = RedisClient()
>>> response_channel = "response_channel_name"
>>> json_result_fragments = [{"trace": {"event_1": 123456789}}]
>>> e = Exception("Network failure")
>>> mdf_size = 1000
>>> handle_failure(broker_client, response_channel, json_result_fragments, e, mdf_size)
nv_ingest.modules.sinks.message_broker_task_sink.process_and_forward(
message: IngestControlMessage,
broker_client: MessageBrokerClientBase,
) IngestControlMessage[source]#

Processes a message by extracting data, creating a JSON payload, and attempting to push it to the message broker.

Parameters:
Returns:

The processed message.

Return type:

IngestControlMessage

Raises:

Exception – If a critical error occurs during processing.

nv_ingest.modules.sinks.message_broker_task_sink.push_to_broker(
broker_client: MessageBrokerClientBase,
response_channel: str,
json_payloads: List[str],
retry_count: int = 2,
) None[source]#

Attempts to push a JSON payload to a message broker channel, retrying on failure up to a specified number of attempts.

Parameters:
  • broker_client (MessageBrokerClient) – The broker client used to push the data.

  • response_channel (str) – The broker channel to which the data is pushed.

  • json_payload (str) – The JSON string payload to be pushed.

  • retry_count (int, optional) – The number of attempts to retry on failure (default is 2).

Return type:

None

Raises:

Valuerror – If pushing to the message broker fails after the specified number of retries.

nv_ingest.modules.sinks.message_broker_task_sink.split_large_dict(
json_data: List[Dict[str, Any]],
size_limit: int,
) List[List[Dict[str, Any]]][source]#

Splits a large list of dictionaries into smaller fragments, each less than the specified size limit (in bytes).

Parameters:
  • json_data (List[Dict[str, Any]]) – The list of dictionaries to split.

  • size_limit (int) – The maximum size in bytes for each fragment.

Returns:

A list of fragments, each fragment being a list of dictionaries, within the size limit.

Return type:

List[List[Dict[str, Any]]]

nv_ingest.modules.sinks.vdb_task_sink module#

class nv_ingest.modules.sinks.vdb_task_sink.AccumulationStats(
msg_count: int,
last_insert_time: float,
data: list[DataFrame],
)[source]#

Bases: object

A data class to store accumulation statistics to support dynamic batching of database inserts.

msg_count#

Total number of accumulated records.

Type:

int

last_insert_time#

A value representing the time of the most recent database insert.

Type:

float

data#

A list containing accumulated batches since the last database insert.

Type:

list[cudf.DataFrame]

data: list[DataFrame]#
last_insert_time: float#
msg_count: int#
nv_ingest.modules.sinks.vdb_task_sink.preprocess_vdb_resources(
service,
recreate: bool,
resource_schemas: dict,
)[source]#

Module contents#