Source code for nv_ingest.util.pipeline.stage_builders
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import math
import os
import click
from nv_ingest_api.primitives.ingest_control_message import IngestControlMessage
from nv_ingest.modules.injectors.metadata_injector import MetadataInjectorLoaderFactory
from nv_ingest.modules.sinks.message_broker_task_sink import MessageBrokerTaskSinkLoaderFactory
from nv_ingest.modules.sinks.vdb_task_sink import VDBTaskSinkLoaderFactory
from nv_ingest.modules.sources.message_broker_task_source import MessageBrokerTaskSourceLoaderFactory
from nv_ingest.modules.telemetry.job_counter import JobCounterLoaderFactory
from nv_ingest.modules.telemetry.otel_meter import OpenTelemetryMeterLoaderFactory
from nv_ingest.modules.telemetry.otel_tracer import OpenTelemetryTracerLoaderFactory
from nv_ingest.modules.transforms.text_splitter import TextSplitterLoaderFactory
from nv_ingest.stages.docx_extractor_stage import generate_docx_extractor_stage
from nv_ingest.stages.embeddings.text_embeddings import generate_text_embed_extractor_stage
from nv_ingest.stages.extractors.image_extractor_stage import generate_image_extractor_stage
from nv_ingest.stages.filters import generate_dedup_stage
from nv_ingest.stages.filters import generate_image_filter_stage
from nv_ingest.stages.nim.audio_extraction import generate_audio_extractor_stage
from nv_ingest.stages.nim.chart_extraction import generate_chart_extractor_stage
from nv_ingest.stages.nim.infographic_extraction import generate_infographic_extractor_stage
from nv_ingest.stages.nim.table_extraction import generate_table_extractor_stage
from nv_ingest.stages.pdf_extractor_stage import generate_pdf_extractor_stage
from nv_ingest.stages.pptx_extractor_stage import generate_pptx_extractor_stage
from nv_ingest.stages.storages.embedding_storage_stage import generate_embedding_storage_stage
from nv_ingest.stages.storages.image_storage_stage import ImageStorageStage
from nv_ingest.stages.transforms.image_caption_extraction import generate_caption_extraction_stage
from nv_ingest.util.morpheus.linear_module_source_stage_cpu import LinearModuleSourceStageCPU, LinearModuleStageCPU
logger = logging.getLogger(__name__)
[docs]
def validate_positive(ctx, param, value):
if value <= 0:
raise click.BadParameter("must be a positive integer")
return value
[docs]
def get_message_provider_config():
message_provider_host = os.environ.get("MESSAGE_CLIENT_HOST", "localhost")
message_provider_port = os.environ.get("MESSAGE_CLIENT_PORT", "6379")
logger.info(f"MESSAGE_CLIENT_HOST: {message_provider_host}")
logger.info(f"MESSAGE_CLIENT_PORT: {message_provider_port}")
return message_provider_host, message_provider_port
[docs]
def get_caption_classifier_service():
triton_service_caption_classifier = os.environ.get(
"CAPTION_CLASSIFIER_GRPC_TRITON",
"",
)
triton_service_caption_classifier_name = os.environ.get(
"CAPTION_CLASSIFIER_MODEL_NAME",
"",
)
logger.info(f"CAPTION_CLASSIFIER_GRPC_TRITON: {triton_service_caption_classifier}")
return triton_service_caption_classifier, triton_service_caption_classifier_name
[docs]
def get_nim_service(env_var_prefix):
prefix = env_var_prefix.upper()
grpc_endpoint = os.environ.get(
f"{prefix}_GRPC_ENDPOINT",
"",
)
http_endpoint = os.environ.get(
f"{prefix}_HTTP_ENDPOINT",
"",
)
auth_token = os.environ.get(
"NVIDIA_BUILD_API_KEY",
"",
) or os.environ.get(
"NGC_API_KEY",
"",
)
infer_protocol = os.environ.get(
f"{prefix}_INFER_PROTOCOL",
"http" if http_endpoint else "grpc" if grpc_endpoint else "",
)
logger.info(f"{prefix}_GRPC_ENDPOINT: {grpc_endpoint}")
logger.info(f"{prefix}_HTTP_ENDPOINT: {http_endpoint}")
logger.info(f"{prefix}_INFER_PROTOCOL: {infer_protocol}")
return grpc_endpoint, http_endpoint, auth_token, infer_protocol
[docs]
def get_default_cpu_count():
default_cpu_count = os.environ.get("NV_INGEST_MAX_UTIL", int(max(1, math.floor(len(os.sched_getaffinity(0))))))
return default_cpu_count
[docs]
def add_source_stage(pipe, morpheus_pipeline_config, ingest_config):
task_broker_host = os.environ.get("MESSAGE_CLIENT_HOST", "localhost")
task_broker_port = os.environ.get("MESSAGE_CLIENT_PORT", "6379")
client_type = os.environ.get("MESSAGE_CLIENT_TYPE", "redis")
task_queue_name = os.environ.get("MESSAGE_CLIENT_QUEUE", "morpheus_task_queue")
source_module_loader = MessageBrokerTaskSourceLoaderFactory.get_instance(
module_name="broker_listener",
module_config=ingest_config.get(
"broker_task_source",
{
"broker_client": {
"host": task_broker_host,
"port": task_broker_port,
"client_type": client_type,
},
"task_queue": task_queue_name,
},
),
)
source_stage = pipe.add_stage(
LinearModuleSourceStageCPU(
morpheus_pipeline_config,
source_module_loader,
output_type=IngestControlMessage,
output_port_name="output",
)
)
return source_stage
[docs]
def add_submitted_job_counter_stage(pipe, morpheus_pipeline_config, ingest_config):
submitted_job_counter_loader = JobCounterLoaderFactory.get_instance(
module_name="submitted_job_counter",
module_config=ingest_config.get(
"submitted_job_counter_module",
{
"name": "submitted_jobs",
},
),
)
submitted_job_counter_stage = pipe.add_stage(
LinearModuleStageCPU(
morpheus_pipeline_config,
submitted_job_counter_loader,
input_type=IngestControlMessage,
output_type=IngestControlMessage,
input_port_name="input",
output_port_name="output",
)
)
return submitted_job_counter_stage
[docs]
def add_metadata_injector_stage(pipe, morpheus_pipeline_config):
metadata_injector_loader = MetadataInjectorLoaderFactory.get_instance(
module_name="metadata_injection", module_config={}
)
metadata_injector_stage = pipe.add_stage(
LinearModuleStageCPU(
morpheus_pipeline_config,
metadata_injector_loader,
input_type=IngestControlMessage,
output_type=IngestControlMessage,
input_port_name="input",
output_port_name="output",
)
)
return metadata_injector_stage
[docs]
def add_pdf_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox")
nemoretriever_parse_grpc, nemoretriever_parse_http, nemoretriever_parse_auth, nemoretriever_parse_protocol = (
get_nim_service("nemoretriever_parse")
)
model_name = os.environ.get("NEMORETRIEVER_PARSE_MODEL_NAME", "nvidia/nemoretriever-parse")
pdf_content_extractor_config = ingest_config.get(
"pdf_content_extraction_module",
{
"pdfium_config": {
"yolox_endpoints": (yolox_grpc, yolox_http),
"yolox_infer_protocol": yolox_protocol,
"auth_token": yolox_auth, # All auth tokens are the same for the moment
},
"nemoretriever_parse_config": {
"nemoretriever_parse_endpoints": (nemoretriever_parse_grpc, nemoretriever_parse_http),
"nemoretriever_parse_infer_protocol": nemoretriever_parse_protocol,
"auth_token": nemoretriever_parse_auth, # All auth tokens are the same for the moment
"model_name": model_name,
},
},
)
pdf_extractor_stage = pipe.add_stage(
generate_pdf_extractor_stage(
morpheus_pipeline_config,
pdf_content_extractor_config,
pe_count=max(1, int(default_cpu_count / 2)),
task="extract",
task_desc="pdf_content_extractor",
)
)
return pdf_extractor_stage
[docs]
def add_table_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox_table_structure")
paddle_grpc, paddle_http, paddle_auth, paddle_protocol = get_nim_service("paddle")
table_content_extractor_config = ingest_config.get(
"table_content_extraction_module",
{
"stage_config": {
"yolox_endpoints": (yolox_grpc, yolox_http),
"yolox_infer_protocol": yolox_protocol,
"paddle_endpoints": (paddle_grpc, paddle_http),
"paddle_infer_protocol": paddle_protocol,
"auth_token": yolox_auth,
}
},
)
table_extractor_stage = pipe.add_stage(
generate_table_extractor_stage(
morpheus_pipeline_config, table_content_extractor_config, pe_count=max(1, int(default_cpu_count / 4))
)
)
return table_extractor_stage
[docs]
def add_chart_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox_graphic_elements")
paddle_grpc, paddle_http, paddle_auth, paddle_protocol = get_nim_service("paddle")
table_content_extractor_config = ingest_config.get(
"table_content_extraction_module",
{
"stage_config": {
"yolox_endpoints": (yolox_grpc, yolox_http),
"yolox_infer_protocol": yolox_protocol,
"paddle_endpoints": (paddle_grpc, paddle_http),
"paddle_infer_protocol": paddle_protocol,
"auth_token": yolox_auth,
}
},
)
table_extractor_stage = pipe.add_stage(
generate_chart_extractor_stage(
morpheus_pipeline_config, table_content_extractor_config, pe_count=max(1, int(default_cpu_count / 4))
)
)
return table_extractor_stage
[docs]
def add_infographic_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
paddle_grpc, paddle_http, paddle_auth, paddle_protocol = get_nim_service("paddle")
infographic_content_extractor_config = ingest_config.get(
"infographic_content_extraction_module",
{
"stage_config": {
"paddle_endpoints": (paddle_grpc, paddle_http),
"paddle_infer_protocol": paddle_protocol,
"auth_token": paddle_auth,
}
},
)
infographic_extractor_stage = pipe.add_stage(
generate_infographic_extractor_stage(
morpheus_pipeline_config, infographic_content_extractor_config, pe_count=max(1, int(default_cpu_count / 4))
)
)
return infographic_extractor_stage
[docs]
def add_image_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox")
image_extractor_config = ingest_config.get(
"image_extraction_module",
{
"image_extraction_config": {
"yolox_endpoints": (yolox_grpc, yolox_http),
"yolox_infer_protocol": yolox_protocol,
"auth_token": yolox_auth, # All auth tokens are the same for the moment
}
},
)
image_extractor_stage = pipe.add_stage(
generate_image_extractor_stage(
morpheus_pipeline_config,
extractor_config=image_extractor_config,
pe_count=max(1, int(default_cpu_count / 4)),
task="extract",
task_desc="image_content_extractor",
)
)
return image_extractor_stage
[docs]
def add_docx_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox")
docx_extractor_config = ingest_config.get(
"docx_extraction_module",
{
"docx_extraction_config": {
"yolox_endpoints": (yolox_grpc, yolox_http),
"yolox_infer_protocol": yolox_protocol,
"auth_token": yolox_auth,
}
},
)
docx_extractor_stage = pipe.add_stage(
generate_docx_extractor_stage(
morpheus_pipeline_config,
extractor_config=docx_extractor_config,
pe_count=max(1, int(default_cpu_count / 4)),
task="extract",
task_desc="docx_content_extractor",
)
)
return docx_extractor_stage
[docs]
def add_pptx_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox")
pptx_extractor_config = ingest_config.get(
"pptx_extraction_module",
{
"pptx_extraction_config": {
"yolox_endpoints": (yolox_grpc, yolox_http),
"yolox_infer_protocol": yolox_protocol,
"auth_token": yolox_auth,
}
},
)
pptx_extractor_stage = pipe.add_stage(
generate_pptx_extractor_stage(
morpheus_pipeline_config,
extractor_config=pptx_extractor_config,
pe_count=max(1, int(default_cpu_count / 4)),
task="extract",
task_desc="pptx_content_extractor",
)
)
return pptx_extractor_stage
[docs]
def get_audio_retrieval_service(env_var_prefix):
prefix = env_var_prefix.upper()
grpc_endpoint = os.environ.get(
"AUDIO_GRPC_ENDPOINT",
"",
)
http_endpoint = os.environ.get(
"AUDIO_HTTP_ENDPOINT",
"",
)
auth_token = os.environ.get(
"NVIDIA_BUILD_API_KEY",
"",
) or os.environ.get(
"NGC_API_KEY",
"",
)
infer_protocol = os.environ.get(
"AUDIO_INFER_PROTOCOL",
"http" if http_endpoint else "grpc" if grpc_endpoint else "",
)
logger.info(f"{prefix}_GRPC_TRITON: {grpc_endpoint}")
logger.info(f"{prefix}_HTTP_TRITON: {http_endpoint}")
logger.info(f"{prefix}_INFER_PROTOCOL: {infer_protocol}")
return grpc_endpoint, http_endpoint, auth_token, infer_protocol
[docs]
def add_audio_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
audio_grpc, audio_http, audio_auth, audio_infer_protocol = get_audio_retrieval_service("audio")
audio_function_id = os.getenv("AUDIO_FUNCTION_ID", "")
audio_extractor_config = ingest_config.get(
"audio_extraction_module",
{
"audio_extraction_config": {
"audio_endpoints": (audio_grpc, audio_http),
"audio_infer_protocol": audio_infer_protocol,
"auth_token": audio_auth,
"function_id": audio_function_id,
# All auth tokens are the same for the moment
}
},
)
audio_extractor_stage = pipe.add_stage(
generate_audio_extractor_stage(
morpheus_pipeline_config,
stage_config=audio_extractor_config,
pe_count=max(1, int(default_cpu_count / 4)),
)
)
return audio_extractor_stage
[docs]
def add_image_dedup_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
image_dedup_config = ingest_config.get("dedup_module", {})
image_dedup_stage = pipe.add_stage(
generate_dedup_stage(
morpheus_pipeline_config,
image_dedup_config,
pe_count=max(1, int(default_cpu_count / 4)),
task="dedup",
task_desc="dedup_images",
)
)
return image_dedup_stage
[docs]
def add_image_filter_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
image_filter_config = ingest_config.get("image_filter", {})
image_filter_stage = pipe.add_stage(
generate_image_filter_stage(
morpheus_pipeline_config,
image_filter_config,
pe_count=max(1, int(default_cpu_count / 4)),
task="filter",
task_desc="filter_images",
)
)
return image_filter_stage
[docs]
def add_text_splitter_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
_ = default_cpu_count
text_splitter_loader = TextSplitterLoaderFactory.get_instance(
module_name="text_splitter",
module_config=ingest_config.get("text_splitting_module", {}),
)
text_splitter_stage = pipe.add_stage(
LinearModuleStageCPU(
morpheus_pipeline_config,
text_splitter_loader,
input_type=IngestControlMessage,
output_type=IngestControlMessage,
input_port_name="input",
output_port_name="output",
)
)
return text_splitter_stage
[docs]
def add_image_caption_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
auth_token = os.environ.get(
"NVIDIA_BUILD_API_KEY",
"",
) or os.environ.get(
"NGC_API_KEY",
"",
)
endpoint_url = os.environ.get("VLM_CAPTION_ENDPOINT", "localhost:5000")
model_name = os.environ.get("VLM_CAPTION_MODEL_NAME", "meta/llama-3.2-11b-vision-instruct")
image_caption_config = ingest_config.get(
"image_caption_extraction_module",
{
"api_key": auth_token,
"endpoint_url": endpoint_url,
"model_name": model_name,
"prompt": "Caption the content of this image:",
},
)
image_caption_stage = pipe.add_stage(
generate_caption_extraction_stage(
morpheus_pipeline_config,
image_caption_config,
pe_count=max(1, int(default_cpu_count / 4)),
task="caption",
task_desc="caption_ext",
)
)
return image_caption_stage
[docs]
def add_embed_extractions_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
_ = ingest_config
api_key = os.environ.get(
"NVIDIA_BUILD_API_KEY",
"",
) or os.environ.get(
"NGC_API_KEY",
"",
)
embedding_nim_endpoint = os.getenv("EMBEDDING_NIM_ENDPOINT", "http://embedding:8000/v1")
embedding_model = os.getenv("EMBEDDING_NIM_MODEL_NAME", "nvidia/nv-embedqa-e5-v5")
text_embed_extraction_config = {
"api_key": api_key,
"embedding_nim_endpoint": embedding_nim_endpoint,
"embedding_model": embedding_model,
}
embed_extractions_stage = pipe.add_stage(
generate_text_embed_extractor_stage(
morpheus_pipeline_config,
text_embed_extraction_config,
pe_count=max(1, int(default_cpu_count / 4)),
task="embed",
task_desc="embed_text",
)
)
return embed_extractions_stage
[docs]
def add_embedding_storage_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
storage_stage = pipe.add_stage(
generate_embedding_storage_stage(
morpheus_pipeline_config,
pe_count=max(1, int(default_cpu_count / 4)),
task="store_embedding",
task_desc="store_embedding_minio",
)
)
return storage_stage
[docs]
def add_image_storage_stage(pipe, morpheus_pipeline_config):
image_storage_stage = pipe.add_stage(ImageStorageStage(morpheus_pipeline_config))
return image_storage_stage
[docs]
def add_sink_stage(pipe, morpheus_pipeline_config, ingest_config):
task_broker_host = os.environ.get("MESSAGE_CLIENT_HOST", "localhost")
task_broker_port = os.environ.get("MESSAGE_CLIENT_PORT", "6379")
client_type = os.environ.get("MESSAGE_CLIENT_TYPE", "redis")
sink_module_loader = MessageBrokerTaskSinkLoaderFactory.get_instance(
module_name="broker_task_sink",
module_config=ingest_config.get(
"broker_task_sink",
{
"broker_client": {
"host": task_broker_host,
"port": task_broker_port,
"client_type": client_type,
},
},
),
)
sink_stage = pipe.add_stage(
LinearModuleStageCPU(
morpheus_pipeline_config,
sink_module_loader,
input_type=IngestControlMessage,
output_type=IngestControlMessage,
input_port_name="input",
output_port_name="output",
)
)
return sink_stage
[docs]
def add_otel_tracer_stage(pipe, morpheus_pipeline_config, ingest_config):
endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
otel_tracer_loader = OpenTelemetryTracerLoaderFactory.get_instance(
module_name="otel_tracer",
module_config=ingest_config.get(
"otel_tracer_module",
{
"otel_endpoint": endpoint,
},
),
)
otel_tracer_stage = pipe.add_stage(
LinearModuleStageCPU(
morpheus_pipeline_config,
otel_tracer_loader,
input_type=IngestControlMessage,
output_type=IngestControlMessage,
input_port_name="input",
output_port_name="output",
)
)
return otel_tracer_stage
[docs]
def add_otel_meter_stage(pipe, morpheus_pipeline_config, ingest_config):
task_broker_host = os.environ.get("MESSAGE_CLIENT_HOST", "localhost")
task_broker_port = os.environ.get("MESSAGE_CLIENT_PORT", "6379")
endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
otel_meter_loader = OpenTelemetryMeterLoaderFactory.get_instance(
module_name="otel_meter",
module_config=ingest_config.get(
"otel_meter_module",
{
"broker_client": {
"host": task_broker_host,
"port": task_broker_port,
"client_type": "redis",
},
"otel_endpoint": endpoint,
},
),
)
otel_meter_stage = pipe.add_stage(
LinearModuleStageCPU(
morpheus_pipeline_config,
otel_meter_loader,
input_type=IngestControlMessage,
output_type=IngestControlMessage,
input_port_name="input",
output_port_name="output",
)
)
return otel_meter_stage
[docs]
def add_completed_job_counter_stage(pipe, morpheus_pipeline_config, ingest_config):
completed_job_counter_loader = JobCounterLoaderFactory.get_instance(
module_name="completed_job_counter",
module_config=ingest_config.get(
"completed_job_counter_module",
{
"name": "completed_jobs",
},
),
)
completed_job_counter_stage = pipe.add_stage(
LinearModuleStageCPU(
morpheus_pipeline_config,
completed_job_counter_loader,
input_type=IngestControlMessage,
output_type=IngestControlMessage,
input_port_name="input",
output_port_name="output",
)
)
return completed_job_counter_stage
[docs]
def add_vdb_task_sink_stage(pipe, morpheus_pipeline_config, ingest_config):
milvus_endpoint = os.getenv("MILVUS_ENDPOINT", "http://milvus:19530")
vdb_task_sink_loader = VDBTaskSinkLoaderFactory.get_instance(
module_name="vdb_task_sink",
module_config=ingest_config.get(
"vdb_task_sink_module",
{
"service_kwargs": {
"uri": milvus_endpoint,
}
},
),
)
vdb_task_sink_stage = pipe.add_stage(
LinearModuleStageCPU(
morpheus_pipeline_config,
vdb_task_sink_loader,
input_type=IngestControlMessage,
output_type=IngestControlMessage,
input_port_name="input",
output_port_name="output",
)
)
return vdb_task_sink_stage