nv_ingest.util.pipeline package#
Submodules#
nv_ingest.util.pipeline.logging module#
nv_ingest.util.pipeline.pipeline_builders module#
nv_ingest.util.pipeline.pipeline_runners module#
- class nv_ingest.util.pipeline.pipeline_runners.PipelineCreationSchema(
- *,
- audio_grpc_endpoint: str = 'grpc.nvcf.nvidia.com:443',
- audio_function_id: str = '1598d209-5e27-4d3c-8079-4751568b1081',
- audio_infer_protocol: str = 'grpc',
- embedding_nim_endpoint: str = 'https://integrate.api.nvidia.com/v1',
- embedding_nim_model_name: str = 'nvidia/llama-3.2-nv-embedqa-1b-v2',
- ingest_log_level: str = 'INFO',
- max_ingest_process_workers: str = '16',
- message_client_host: str = 'localhost',
- message_client_port: str = '7671',
- message_client_type: str = 'simple',
- mrc_ignore_numa_check: str = '1',
- nemoretriever_parse_http_endpoint: str = 'https://integrate.api.nvidia.com/v1/chat/completions',
- nemoretriever_parse_infer_protocol: str = 'http',
- nemoretriever_parse_model_name: str = 'nvidia/nemoretriever-parse',
- ngc_api_key: str = '',
- nvidia_build_api_key: str = '',
- otel_exporter_otlp_endpoint: str = 'localhost:4317',
- paddle_http_endpoint: str = 'https://ai.api.nvidia.com/v1/cv/baidu/paddleocr',
- paddle_infer_protocol: str = 'http',
- redis_morpheus_task_queue: str = 'morpheus_task_queue',
- vlm_caption_endpoint: str = 'https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-11b-vision-instruct/chat/completions',
- vlm_caption_model_name: str = 'meta/llama-3.2-11b-vision-instruct',
- yolox_graphic_elements_http_endpoint: str = 'https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-graphic-elements-v1',
- yolox_graphic_elements_infer_protocol: str = 'http',
- yolox_http_endpoint: str = 'https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-page-elements-v2',
- yolox_infer_protocol: str = 'http',
- yolox_table_structure_http_endpoint: str = 'https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-table-structure-v1',
- yolox_table_structure_infer_protocol: str = 'http',
Bases:
BaseModel
Schema for pipeline creation configuration.
Contains all parameters required to set up and execute a Morpheus pipeline, including endpoints, API keys, and processing options.
- audio_function_id: str#
- audio_grpc_endpoint: str#
- audio_infer_protocol: str#
- embedding_nim_endpoint: str#
- embedding_nim_model_name: str#
- ingest_log_level: str#
- max_ingest_process_workers: str#
- message_client_host: str#
- message_client_port: str#
- message_client_type: str#
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- mrc_ignore_numa_check: str#
- nemoretriever_parse_http_endpoint: str#
- nemoretriever_parse_infer_protocol: str#
- nemoretriever_parse_model_name: str#
- ngc_api_key: str#
- nvidia_build_api_key: str#
- otel_exporter_otlp_endpoint: str#
- paddle_http_endpoint: str#
- paddle_infer_protocol: str#
- redis_morpheus_task_queue: str#
- vlm_caption_endpoint: str#
- vlm_caption_model_name: str#
- yolox_graphic_elements_http_endpoint: str#
- yolox_graphic_elements_infer_protocol: str#
- yolox_http_endpoint: str#
- yolox_infer_protocol: str#
- yolox_table_structure_http_endpoint: str#
- yolox_table_structure_infer_protocol: str#
- nv_ingest.util.pipeline.pipeline_runners.is_port_in_use(port, host='127.0.0.1')[source]#
Checks if a given port is in use on the specified host with socket reuse settings.
- Parameters:
port (int) – The port number to check.
host (str) – The host to check on. Default is ‘127.0.0.1’.
- Returns:
True if the port is in use, False otherwise.
- Return type:
bool
- nv_ingest.util.pipeline.pipeline_runners.run_ingest_pipeline(
- ingest_config_path: str | None = None,
- caption_batch_size: int = 8,
- use_cpp: bool = False,
- pipeline_batch_size: int = 256,
- enable_monitor: bool = False,
- feature_length: int = 512,
- num_threads: int | None = None,
- model_max_batch_size: int = 256,
- mode: str = 'NLP',
- log_level: str = 'INFO',
Configures and runs the pipeline with the specified options.
This function serves as the main entry point for configuring and executing a pipeline with user-defined settings.
- Parameters:
ingest_config_path (str, optional) – Path to the JSON configuration file.
caption_batch_size (int, optional) – Number of captions to process in a batch (default: 8).
use_cpp (bool, optional) – Use C++ backend (default: False).
pipeline_batch_size (int, optional) – Batch size for the pipeline (default: 256).
enable_monitor (bool, optional) – Enable monitoring (default: False).
feature_length (int, optional) – Feature length for embeddings (default: 512).
num_threads (int, optional) – Number of threads (default: determined by get_default_cpu_count).
model_max_batch_size (int, optional) – Model max batch size (default: 256).
mode (str, optional) – Pipeline mode (default: PipelineModes.NLP.value).
log_level (str, optional) – Log level (default: ‘INFO’).
- Raises:
ValidationError – If the configuration validation fails.
- nv_ingest.util.pipeline.pipeline_runners.run_pipeline(
- morpheus_pipeline_config: Any,
- ingest_config: Dict[str, Any],
Runs the pipeline synchronously in the current process.
This is the primary entry point for executing a pipeline directly in the current process.
- Parameters:
morpheus_pipeline_config (Config) – The configuration object for the Morpheus pipeline.
ingest_config (Dict[str, Any]) – The ingestion configuration dictionary.
- Returns:
The total elapsed time for running the pipeline.
- Return type:
float
- Raises:
Exception – Any exception raised during pipeline execution will be propagated.
- nv_ingest.util.pipeline.pipeline_runners.start_pipeline_subprocess(
- config: PipelineCreationSchema,
- stdout: TextIO | None = None,
- stderr: TextIO | None = None,
Launches the pipeline in a subprocess and ensures that it terminates if the parent process dies.
This function encapsulates all subprocess-related setup, including signal handling and atexit registration.
- Parameters:
config (PipelineCreationSchema) – Validated pipeline configuration.
stdout (file-like object or None, optional) – File-like object for capturing stdout. If None, output is ignored.
stderr (file-like object or None, optional) – File-like object for capturing stderr. If None, output is ignored.
- Returns:
The subprocess object for the launched pipeline.
- Return type:
subprocess.Popen
- nv_ingest.util.pipeline.pipeline_runners.subprocess_entrypoint() None [source]#
Entry point for the pipeline subprocess.
This function is called when a pipeline subprocess is started. It configures logging and runs the ingest pipeline.
- Raises:
Exception – Any exception raised during pipeline execution will cause the subprocess to exit with a non-zero status code.
nv_ingest.util.pipeline.stage_builders module#
- nv_ingest.util.pipeline.stage_builders.add_audio_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_chart_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_completed_job_counter_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- nv_ingest.util.pipeline.stage_builders.add_docx_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_embed_extractions_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_embedding_storage_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_image_caption_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_image_dedup_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_image_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_image_filter_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_image_storage_stage(pipe, morpheus_pipeline_config)[source]#
- nv_ingest.util.pipeline.stage_builders.add_infographic_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_metadata_injector_stage(pipe, morpheus_pipeline_config)[source]#
- nv_ingest.util.pipeline.stage_builders.add_otel_meter_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
- nv_ingest.util.pipeline.stage_builders.add_otel_tracer_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
- nv_ingest.util.pipeline.stage_builders.add_pdf_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_pptx_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_sink_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
- nv_ingest.util.pipeline.stage_builders.add_source_stage(pipe, morpheus_pipeline_config, ingest_config)[source]#
- nv_ingest.util.pipeline.stage_builders.add_submitted_job_counter_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- nv_ingest.util.pipeline.stage_builders.add_table_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.stage_builders.add_text_splitter_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
Module contents#
- nv_ingest.util.pipeline.add_chart_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_docx_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_embed_extractions_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_embedding_storage_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_image_caption_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_image_dedup_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_image_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_image_filter_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_pdf_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_pptx_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_submitted_job_counter_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- nv_ingest.util.pipeline.add_table_extractor_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,
- nv_ingest.util.pipeline.add_text_splitter_stage(
- pipe,
- morpheus_pipeline_config,
- ingest_config,
- default_cpu_count,