Source code for nv_ingest_api.util.control_message.validators
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from nv_ingest_api.internal.primitives.ingest_control_message import IngestControlMessage
[docs]
def cm_ensure_payload_not_null(control_message: IngestControlMessage):
"""
Ensures that the payload of a IngestControlMessage is not None.
Parameters
----------
control_message : IngestControlMessage
The IngestControlMessage to check.
Raises
------
ValueError
If the payload is None.
"""
if control_message.payload() is None:
raise ValueError("Payload cannot be None")
[docs]
def cm_set_failure(control_message: IngestControlMessage, reason: str) -> IngestControlMessage:
"""
Sets the failure metadata on a IngestControlMessage.
Parameters
----------
control_message : IngestControlMessage
The IngestControlMessage to set the failure metadata on.
reason : str
The reason for the failure.
Returns
-------
control_message : IngestControlMessage
The modified IngestControlMessage with the failure metadata set.
"""
control_message.set_metadata("cm_failed", True)
control_message.set_metadata("cm_failed_reason", reason)
return control_message