nat.front_ends.fastapi.async_job#

The functions in this module are intentionally written to be submitted as Dask tasks, as such they are self-contained.

Functions#

_configure_logging(→ logging.Logger)

run_generation(configure_logging, log_level, ...)

Background async task to run the workflow.

periodic_cleanup(*, scheduler_address, db_url[, ...])

Dask task to periodically clean up expired jobs from the job store. This task is intended to be submitted only

_setup_worker()

Setup function to be run in each worker process. This moves each worker into its own process group.

Module Contents#

_configure_logging(
configure_logging: bool,
log_level: int,
) logging.Logger#
async run_generation(
configure_logging: bool,
log_level: int,
scheduler_address: str,
db_url: str,
config_file_path: str,
job_id: str,
payload: Any,
)#

Background async task to run the workflow.

Parameters#

configure_loggingbool

Whether to configure logging.

log_levelint

The log level to use when configure_logging is True, ignored otherwise.

scheduler_addressstr

The Dask scheduler address.

db_urlstr

The database URL for the job store.

config_file_pathstr

The path to the workflow configuration file.

job_idstr

The job ID.

payloadtyping.Any

The input payload for the workflow.

async periodic_cleanup(
*,
scheduler_address: str,
db_url: str,
sleep_time_sec: int = 300,
configure_logging: bool = True,
log_level: int = logging.INFO,
)#

Dask task to periodically clean up expired jobs from the job store. This task is intended to be submitted only once to the Dask cluster and run indefinitely.

Parameters#

scheduler_addressstr

The Dask scheduler address.

db_urlstr

The database URL for the job store.

sleep_time_secint

The sleep time between cleanup operations in seconds.

configure_loggingbool

Whether to configure logging.

log_levelint

The log level to use when configure_logging is True, ignored otherwise.

_setup_worker()#

Setup function to be run in each worker process. This moves each worker into its own process group. This fixes an issue where a Ctrl-C in the terminal sends a SIGINT to all workers, which then causes the workers to exit before the main process can shut down the cluster gracefully.