Source code for nv_ingest.framework.orchestration.ray.examples.task_source_harness
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import ray
import logging
import time
# Import the new source stage and its configuration
from nv_ingest.framework.orchestration.ray.stages.sources.message_broker_task_source import (
MessageBrokerTaskSourceStage,
MessageBrokerTaskSourceConfig,
)
[docs]
def main():
# Initialize Ray
ray.init(ignore_reinit_error=True)
# Set up basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("RayTestHarness")
# Define the Redis configuration for the MessageBrokerTaskSource
redis_config = {
"client_type": "redis",
"host": "localhost", # Adjust host if needed
"port": 6379, # Default Redis port
"max_retries": 3,
"max_backoff": 2,
"connection_timeout": 5,
"broker_params": {"db": 0, "use_ssl": False},
}
# Create an instance of the configuration for the source stage.
config = MessageBrokerTaskSourceConfig(
broker_client=redis_config,
task_queue="ingest_task_queue",
poll_interval=0.1,
)
message_broker_actor = MessageBrokerTaskSourceStage.remote(config)
# Start the actor to begin fetching messages.
ray.get(message_broker_actor.start.remote())
logger.info("MessageBrokerTaskSource actor started. Listening for messages...")
try:
# Run indefinitely until a KeyboardInterrupt (Ctrl+C) is received.
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Ctrl+C detected. Stopping actor...")
ray.get(message_broker_actor.stop.remote())
stats = ray.get(message_broker_actor.get_stats.remote())
logger.info(f"Actor processing stats: {stats}")
finally:
ray.shutdown()
logger.info("Ray shutdown complete.")
if __name__ == "__main__":
main()