Source code for nv_ingest.framework.orchestration.ray.examples.task_source_sink_harness

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import ray
import logging
import time

# Import the source and sink stages and their configuration models.
from nv_ingest.framework.orchestration.ray.stages.sources.message_broker_task_source import (
    MessageBrokerTaskSourceStage,
    MessageBrokerTaskSourceConfig,
)
from nv_ingest.framework.orchestration.ray.stages.sinks.message_broker_task_sink import (
    MessageBrokerTaskSinkStage,
    MessageBrokerTaskSinkConfig,
)

# Import the async queue edge.
from nv_ingest.framework.orchestration.ray.edges.async_queue_edge import AsyncQueueEdge


[docs] def main(): # Initialize Ray. ray.init(ignore_reinit_error=True) # Set up basic logging. logging.basicConfig(level=logging.INFO) logger = logging.getLogger("RayPipelineHarness") # Define the Redis configuration for the message broker (used for both source and sink). redis_config = { "client_type": "redis", "host": "localhost", # Adjust as needed. "port": 6379, "max_retries": 3, "max_backoff": 2, "connection_timeout": 5, "broker_params": {"db": 0, "use_ssl": False}, } # Create a configuration instance for the source stage. source_config = MessageBrokerTaskSourceConfig( broker_client=redis_config, task_queue="ingest_task_queue", poll_interval=0.1, batch_size=10, ) # Create a configuration instance for the sink stage. sink_config = MessageBrokerTaskSinkConfig( broker_client=redis_config, poll_interval=0.1, # Using the same poll_interval; adjust as needed. ) # Create an instance of the AsyncQueueEdge actor with a maximum size of 100. queue_edge = AsyncQueueEdge.remote(max_size=100, multi_reader=True, multi_writer=True) # Create an instance of the MessageBrokerTaskSourceStage actor. source_actor = MessageBrokerTaskSourceStage.remote(source_config, 1) # Create an instance of the MessageBrokerTaskSinkStage actor. sink_actor = MessageBrokerTaskSinkStage.remote(sink_config, 1) # Connect the stages: # The source's output edge is the queue_edge. ray.get(source_actor.set_output_edge.remote(queue_edge)) # The sink's input edge is the same queue_edge. ray.get(sink_actor.set_input_edge.remote(queue_edge)) # Start both actors. ray.get(source_actor.start.remote()) ray.get(sink_actor.start.remote()) logger.info("Source and Sink actors started, connected via AsyncQueueEdge.") try: # Run indefinitely until a KeyboardInterrupt (Ctrl+C) is received. while True: time.sleep(1) except KeyboardInterrupt: logger.info("KeyboardInterrupt received. Stopping actors...") ray.get(source_actor.stop.remote()) ray.get(sink_actor.stop.remote()) source_stats = ray.get(source_actor.get_stats.remote()) sink_stats = ray.get(sink_actor.get_stats.remote()) logger.info(f"Source stats: {source_stats}") logger.info(f"Sink stats: {sink_stats}") finally: ray.shutdown() logger.info("Ray shutdown complete.")
if __name__ == "__main__": main()