Source code for nv_ingest.util.pipeline.pipeline_builders

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import typing

from morpheus.config import Config
from morpheus.pipeline.pipeline import Pipeline

from nv_ingest.util.pipeline.stage_builders import *

logger = logging.getLogger(__name__)


[docs] def setup_ingestion_pipeline( pipe: Pipeline, morpheus_pipeline_config: Config, ingest_config: typing.Dict[str, typing.Any] ): default_cpu_count = get_default_cpu_count() add_meter_stage = os.environ.get("MESSAGE_CLIENT_TYPE") != "simple" ######################################################################################################## ## Insertion and Pre-processing stages ######################################################################################################## source_stage = add_source_stage(pipe, morpheus_pipeline_config, ingest_config) submitted_job_counter_stage = add_submitted_job_counter_stage(pipe, morpheus_pipeline_config, ingest_config) metadata_injector_stage = add_metadata_injector_stage(pipe, morpheus_pipeline_config) ######################################################################################################## ######################################################################################################## ## Primitive extraction ######################################################################################################## pdf_extractor_stage = add_pdf_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) image_extractor_stage = add_image_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) docx_extractor_stage = add_docx_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) pptx_extractor_stage = add_pptx_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) audio_extractor_stage = add_audio_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) ######################################################################################################## ######################################################################################################## ## Post-processing ######################################################################################################## image_dedup_stage = add_image_dedup_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) image_filter_stage = add_image_filter_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) table_extraction_stage = add_table_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) chart_extraction_stage = add_chart_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) infographic_extraction_stage = add_infographic_extractor_stage( pipe, morpheus_pipeline_config, ingest_config, default_cpu_count ) image_caption_stage = add_image_caption_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) ######################################################################################################## ######################################################################################################## ## Transforms and data synthesis ######################################################################################################## text_splitter_stage = add_text_splitter_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count) embed_extractions_stage = add_embed_extractions_stage( pipe, morpheus_pipeline_config, ingest_config, default_cpu_count ) ######################################################################################################## ## Storage and output ######################################################################################################## embedding_storage_stage = add_embedding_storage_stage( pipe, morpheus_pipeline_config, default_cpu_count, default_cpu_count ) image_storage_stage = add_image_storage_stage(pipe, morpheus_pipeline_config) # vdb_task_sink_stage = add_vdb_task_sink_stage(pipe, morpheus_pipeline_config, ingest_config) sink_stage = add_sink_stage(pipe, morpheus_pipeline_config, ingest_config) ######################################################################################################## ####################################################################################################### ## Telemetry (Note: everything after the sync stage is out of the hot path, please keep it that way) ## ####################################################################################################### otel_tracer_stage = add_otel_tracer_stage(pipe, morpheus_pipeline_config, ingest_config) if add_meter_stage: otel_meter_stage = add_otel_meter_stage(pipe, morpheus_pipeline_config, ingest_config) else: otel_meter_stage = None completed_job_counter_stage = add_completed_job_counter_stage(pipe, morpheus_pipeline_config, ingest_config) ######################################################################################################## # Add edges pipe.add_edge(source_stage, submitted_job_counter_stage) pipe.add_edge(submitted_job_counter_stage, metadata_injector_stage) pipe.add_edge(metadata_injector_stage, pdf_extractor_stage) pipe.add_edge(pdf_extractor_stage, image_extractor_stage) pipe.add_edge(image_extractor_stage, docx_extractor_stage) pipe.add_edge(docx_extractor_stage, pptx_extractor_stage) pipe.add_edge(pptx_extractor_stage, audio_extractor_stage) pipe.add_edge(audio_extractor_stage, image_dedup_stage) pipe.add_edge(image_dedup_stage, image_filter_stage) pipe.add_edge(image_filter_stage, table_extraction_stage) pipe.add_edge(table_extraction_stage, chart_extraction_stage) pipe.add_edge(chart_extraction_stage, infographic_extraction_stage) pipe.add_edge(infographic_extraction_stage, text_splitter_stage) pipe.add_edge(text_splitter_stage, image_caption_stage) pipe.add_edge(image_caption_stage, embed_extractions_stage) pipe.add_edge(embed_extractions_stage, image_storage_stage) pipe.add_edge(image_storage_stage, embedding_storage_stage) pipe.add_edge(embedding_storage_stage, sink_stage) # pipe.add_edge(vdb_task_sink_stage, sink_stage) if add_meter_stage: pipe.add_edge(sink_stage, otel_meter_stage) pipe.add_edge(otel_meter_stage, otel_tracer_stage) else: pipe.add_edge(sink_stage, otel_tracer_stage) pipe.add_edge(otel_tracer_stage, completed_job_counter_stage)