nv_ingest.util.pipeline package#

Submodules#

nv_ingest.util.pipeline.logging module#

nv_ingest.util.pipeline.logging.get_log_level(str_level)[source]#

Converts the log level from a string to a logging level.

nv_ingest.util.pipeline.logging.setup_logging(log_level)[source]#

Configures logging based on the provided log level or the INGEST_LOG_LEVEL environment variable.

nv_ingest.util.pipeline.pipeline_builders module#

nv_ingest.util.pipeline.pipeline_builders.setup_ingestion_pipeline(
pipe: Pipeline,
morpheus_pipeline_config: Config,
ingest_config: Dict[str, Any],
)[source]#

nv_ingest.util.pipeline.pipeline_runners module#

class nv_ingest.util.pipeline.pipeline_runners.PipelineCreationSchema(
*,
audio_grpc_endpoint: str = 'grpc.nvcf.nvidia.com:443',
audio_function_id: str = '1598d209-5e27-4d3c-8079-4751568b1081',
audio_infer_protocol: str = 'grpc',
embedding_nim_endpoint: str = 'https://integrate.api.nvidia.com/v1',
embedding_nim_model_name: str = 'nvidia/llama-3.2-nv-embedqa-1b-v2',
ingest_log_level: str = 'INFO',
max_ingest_process_workers: str = '16',
message_client_host: str = 'localhost',
message_client_port: str = '7671',
message_client_type: str = 'simple',
mrc_ignore_numa_check: str = '1',
nemoretriever_parse_http_endpoint: str = 'https://integrate.api.nvidia.com/v1/chat/completions',
nemoretriever_parse_infer_protocol: str = 'http',
nemoretriever_parse_model_name: str = 'nvidia/nemoretriever-parse',
ngc_api_key: str = '',
nvidia_build_api_key: str = '',
otel_exporter_otlp_endpoint: str = 'localhost:4317',
paddle_http_endpoint: str = 'https://ai.api.nvidia.com/v1/cv/baidu/paddleocr',
paddle_infer_protocol: str = 'http',
redis_morpheus_task_queue: str = 'morpheus_task_queue',
vlm_caption_endpoint: str = 'https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-11b-vision-instruct/chat/completions',
vlm_caption_model_name: str = 'meta/llama-3.2-11b-vision-instruct',
yolox_graphic_elements_http_endpoint: str = 'https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-graphic-elements-v1',
yolox_graphic_elements_infer_protocol: str = 'http',
yolox_http_endpoint: str = 'https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-page-elements-v2',
yolox_infer_protocol: str = 'http',
yolox_table_structure_http_endpoint: str = 'https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-table-structure-v1',
yolox_table_structure_infer_protocol: str = 'http',
)[source]#

Bases: BaseModel

Schema for pipeline creation configuration.

Contains all parameters required to set up and execute a Morpheus pipeline, including endpoints, API keys, and processing options.

audio_function_id: str#
audio_grpc_endpoint: str#
audio_infer_protocol: str#
embedding_nim_endpoint: str#
embedding_nim_model_name: str#
ingest_log_level: str#
max_ingest_process_workers: str#
message_client_host: str#
message_client_port: str#
message_client_type: str#
model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

mrc_ignore_numa_check: str#
nemoretriever_parse_http_endpoint: str#
nemoretriever_parse_infer_protocol: str#
nemoretriever_parse_model_name: str#
ngc_api_key: str#
nvidia_build_api_key: str#
otel_exporter_otlp_endpoint: str#
paddle_http_endpoint: str#
paddle_infer_protocol: str#
redis_morpheus_task_queue: str#
vlm_caption_endpoint: str#
vlm_caption_model_name: str#
yolox_graphic_elements_http_endpoint: str#
yolox_graphic_elements_infer_protocol: str#
yolox_http_endpoint: str#
yolox_infer_protocol: str#
yolox_table_structure_http_endpoint: str#
yolox_table_structure_infer_protocol: str#
nv_ingest.util.pipeline.pipeline_runners.is_port_in_use(port, host='127.0.0.1')[source]#

Checks if a given port is in use on the specified host with socket reuse settings.

Parameters:
  • port (int) – The port number to check.

  • host (str) – The host to check on. Default is ‘127.0.0.1’.

Returns:

True if the port is in use, False otherwise.

Return type:

bool

nv_ingest.util.pipeline.pipeline_runners.run_ingest_pipeline(
ingest_config_path: str | None = None,
caption_batch_size: int = 8,
use_cpp: bool = False,
pipeline_batch_size: int = 256,
enable_monitor: bool = False,
feature_length: int = 512,
num_threads: int | None = None,
model_max_batch_size: int = 256,
mode: str = 'NLP',
log_level: str = 'INFO',
) None[source]#

Configures and runs the pipeline with the specified options.

This function serves as the main entry point for configuring and executing a pipeline with user-defined settings.

Parameters:
  • ingest_config_path (str, optional) – Path to the JSON configuration file.

  • caption_batch_size (int, optional) – Number of captions to process in a batch (default: 8).

  • use_cpp (bool, optional) – Use C++ backend (default: False).

  • pipeline_batch_size (int, optional) – Batch size for the pipeline (default: 256).

  • enable_monitor (bool, optional) – Enable monitoring (default: False).

  • feature_length (int, optional) – Feature length for embeddings (default: 512).

  • num_threads (int, optional) – Number of threads (default: determined by get_default_cpu_count).

  • model_max_batch_size (int, optional) – Model max batch size (default: 256).

  • mode (str, optional) – Pipeline mode (default: PipelineModes.NLP.value).

  • log_level (str, optional) – Log level (default: ‘INFO’).

Raises:

ValidationError – If the configuration validation fails.

nv_ingest.util.pipeline.pipeline_runners.run_pipeline(
morpheus_pipeline_config: Any,
ingest_config: Dict[str, Any],
) float[source]#

Runs the pipeline synchronously in the current process.

This is the primary entry point for executing a pipeline directly in the current process.

Parameters:
  • morpheus_pipeline_config (Config) – The configuration object for the Morpheus pipeline.

  • ingest_config (Dict[str, Any]) – The ingestion configuration dictionary.

Returns:

The total elapsed time for running the pipeline.

Return type:

float

Raises:

Exception – Any exception raised during pipeline execution will be propagated.

nv_ingest.util.pipeline.pipeline_runners.start_pipeline_subprocess(
config: PipelineCreationSchema,
stdout: TextIO | None = None,
stderr: TextIO | None = None,
) Popen[source]#

Launches the pipeline in a subprocess and ensures that it terminates if the parent process dies.

This function encapsulates all subprocess-related setup, including signal handling and atexit registration.

Parameters:
  • config (PipelineCreationSchema) – Validated pipeline configuration.

  • stdout (file-like object or None, optional) – File-like object for capturing stdout. If None, output is ignored.

  • stderr (file-like object or None, optional) – File-like object for capturing stderr. If None, output is ignored.

Returns:

The subprocess object for the launched pipeline.

Return type:

subprocess.Popen

nv_ingest.util.pipeline.pipeline_runners.subprocess_entrypoint() None[source]#

Entry point for the pipeline subprocess.

This function is called when a pipeline subprocess is started. It configures logging and runs the ingest pipeline.

Raises:

Exception – Any exception raised during pipeline execution will cause the subprocess to exit with a non-zero status code.

nv_ingest.util.pipeline.stage_builders module#

nv_ingest.util.pipeline.stage_builders.add_audio_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_chart_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_completed_job_counter_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_docx_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_embed_extractions_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_embedding_storage_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_image_caption_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_image_dedup_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_image_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_image_filter_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_image_storage_stage(pipe, morpheus_pipeline_config)[source]#
nv_ingest.util.pipeline.stage_builders.add_infographic_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_metadata_injector_stage(pipe, morpheus_pipeline_config)[source]#
nv_ingest.util.pipeline.stage_builders.add_otel_meter_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
nv_ingest.util.pipeline.stage_builders.add_otel_tracer_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
nv_ingest.util.pipeline.stage_builders.add_pdf_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_pptx_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_sink_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
nv_ingest.util.pipeline.stage_builders.add_source_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
nv_ingest.util.pipeline.stage_builders.add_submitted_job_counter_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_table_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_text_splitter_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.stage_builders.add_vdb_task_sink_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
nv_ingest.util.pipeline.stage_builders.get_audio_retrieval_service(env_var_prefix)[source]#
nv_ingest.util.pipeline.stage_builders.get_caption_classifier_service()[source]#
nv_ingest.util.pipeline.stage_builders.get_default_cpu_count()[source]#
nv_ingest.util.pipeline.stage_builders.get_message_provider_config()[source]#
nv_ingest.util.pipeline.stage_builders.get_nim_service(env_var_prefix)[source]#
nv_ingest.util.pipeline.stage_builders.validate_positive(ctx, param, value)[source]#

Module contents#

nv_ingest.util.pipeline.add_chart_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_docx_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_embed_extractions_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_embedding_storage_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_image_caption_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_image_dedup_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_image_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_image_filter_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_image_storage_stage(pipe, morpheus_pipeline_config)[source]#
nv_ingest.util.pipeline.add_metadata_injector_stage(pipe, morpheus_pipeline_config)[source]#
nv_ingest.util.pipeline.add_pdf_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_pptx_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_sink_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
nv_ingest.util.pipeline.add_source_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
nv_ingest.util.pipeline.add_submitted_job_counter_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
)[source]#
nv_ingest.util.pipeline.add_table_extractor_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_text_splitter_stage(
pipe,
morpheus_pipeline_config,
ingest_config,
default_cpu_count,
)[source]#
nv_ingest.util.pipeline.add_vdb_task_sink_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
nv_ingest.util.pipeline.setup_ingestion_pipeline(
pipe: Pipeline,
morpheus_pipeline_config: Config,
ingest_config: Dict[str, Any],
)[source]#