Source code for nv_ingest.stages.storages.image_storage_stage
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import typing
import mrc
from morpheus.config import Config, ExecutionMode
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.utils.module_utils import ModuleLoader
from nv_ingest.modules.storages.image_storage import ImageStorageLoaderFactory
from nv_ingest_api.primitives.ingest_control_message import IngestControlMessage
logger = logging.getLogger(__name__)
[docs]
class ImageStorageStage(PassThruTypeMixin, SinglePortStage):
"""
Stores images.
Parameters
----------
config : Config
Pipeline configuration instance.
Raises
------
"""
def __init__(
self,
config: Config,
module_config: typing.Dict = None,
raise_on_failure: bool = False,
) -> None:
super().__init__(config)
if module_config is None:
module_config = {
"raise_on_failure": raise_on_failure,
}
module_name = "image_storage"
self._module_loader: ModuleLoader = ImageStorageLoaderFactory.get_instance(module_name, module_config)
@property
def name(self) -> str:
return "image-storage"
[docs]
def accepted_types(self) -> typing.Tuple:
"""
Returns accepted input types for this stage.
Returns
-------
typing.Tuple(IngestControlMessage, MultiResponseMessage, MultiMessage)
Accepted input types.
"""
return (IngestControlMessage,)
[docs]
def supports_cpp_node(self):
"""Indicates whether this stage supports a C++ node."""
return False
[docs]
def supported_execution_modes(self) -> tuple[ExecutionMode]:
# Provide your own logic here; for example:
return (ExecutionMode.CPU,)
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
module = self._module_loader.load(builder)
# Input and Output port names should be same as input and output port names of write_to_vector_db module.
mod_in_node = module.input_port("input")
mod_out_node = module.output_port("output")
builder.make_edge(input_node, mod_in_node)
return mod_out_node