nat.front_ends.fastapi.job_store#

Classes#

JobStatus

Enumeration of possible job statuses in the job store.

JobInfo

SQLAlchemy model representing job metadata and status information.

JobStore

Tracks and manages jobs submitted to the Dask scheduler, along with persisting job metadata (JobInfo objects) in a

Functions#

get_db_engine(→ Engine | AsyncEngine)

Create a SQLAlchemy database engine, this should only be run once per process

Module Contents#

class JobStatus#

Bases: str, enum.Enum

Enumeration of possible job statuses in the job store.

Attributes#

SUBMITTEDstr

Job has been submitted to the scheduler but not yet started.

RUNNINGstr

Job is currently being executed.

SUCCESSstr

Job completed successfully.

FAILUREstr

Job failed during execution.

INTERRUPTEDstr

Job was interrupted or cancelled before completion.

NOT_FOUNDstr

Job ID does not exist in the job store.

Initialize self. See help(type(self)) for accurate signature.

SUBMITTED = 'submitted'#
RUNNING = 'running'#
SUCCESS = 'success'#
FAILURE = 'failure'#
INTERRUPTED = 'interrupted'#
NOT_FOUND = 'not_found'#
class JobInfo#

Bases: Base

SQLAlchemy model representing job metadata and status information.

This model stores comprehensive information about jobs submitted to the Dask scheduler, including their current status, configuration, outputs, and lifecycle metadata.

Attributes#

job_idstr

Unique identifier for the job (primary key).

statusJobStatus

Current status of the job.

config_filestr, optional

Path to the configuration file used for the job.

errorstr, optional

Error message if the job failed.

output_pathstr, optional

Path where job outputs are stored.

created_atdatetime

Timestamp when the job was created.

updated_atdatetime

Timestamp when the job was last updated.

expiry_secondsint

Number of seconds after which the job is eligible for cleanup.

outputstr, optional

Serialized job output data (JSON format).

is_expiredbool

Flag indicating if the job has been marked as expired.

job_id: sqlalchemy.orm.Mapped[str]#
status: sqlalchemy.orm.Mapped[JobStatus]#
config_file: sqlalchemy.orm.Mapped[str]#
error: sqlalchemy.orm.Mapped[str]#
output_path: sqlalchemy.orm.Mapped[str]#
created_at: sqlalchemy.orm.Mapped[datetime.datetime]#
updated_at: sqlalchemy.orm.Mapped[datetime.datetime]#
expiry_seconds: sqlalchemy.orm.Mapped[int]#
output: sqlalchemy.orm.Mapped[str]#
is_expired: sqlalchemy.orm.Mapped[bool]#
class JobStore(
scheduler_address: str,
db_engine: AsyncEngine | None = None,
db_url: str | None = None,
)#

Bases: nat.front_ends.fastapi.dask_client_mixin.DaskClientMixin

Tracks and manages jobs submitted to the Dask scheduler, along with persisting job metadata (JobInfo objects) in a database.

Parameters#

scheduler_address: str

The address of the Dask scheduler.

db_engine: AsyncEngine | None, optional, default=None

The database engine for the job store.

db_url: str | None, optional, default=None

The database URL to connect to, used when db_engine is not provided. Refer to: https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls

MIN_EXPIRY = 600#
MAX_EXPIRY = 86400#
DEFAULT_EXPIRY = 3600#
ACTIVE_STATUS#
_scheduler_address#
_session#
async client() collections.abc.AsyncGenerator[dask.distributed.Client]#

Async context manager for obtaining a Dask client connection.

Yields#

DaskClient

An active Dask client connected to the scheduler. The client is automatically closed when exiting the context manager.

async session() collections.abc.AsyncGenerator[sqlalchemy.ext.asyncio.AsyncSession]#

Async context manager for a SQLAlchemy session with automatic transaction management.

Creates a new database session scoped to the current async task and begins a transaction. The transaction is committed on successful exit and rolled back on exception. The session is automatically removed from the registry after use.

Yields#

AsyncSession

An active SQLAlchemy async session with an open transaction.

ensure_job_id(job_id: str | None) str#

Ensure a job ID is provided, generating a new one if necessary.

If a job ID is provided, it is returned as-is.

Parameters#

job_id: str | None

The job ID to ensure, or None to generate a new one.

async _create_job(
config_file: str | None = None,
job_id: str | None = None,
expiry_seconds: int = DEFAULT_EXPIRY,
) str#

Create a job and add it to the job store. This should not be called directly, but instead be called by submit_job

async submit_job(
*,
job_id: str | None = None,
config_file: str | None = None,
expiry_seconds: int = DEFAULT_EXPIRY,
sync_timeout: int = 0,
job_fn: collections.abc.Callable[Ellipsis, Any],
job_args: list[Any],
**job_kwargs,
) tuple[str, JobInfo | None]#

Submit a job to the Dask scheduler, and store job metadata in the database.

Parameters#

job_id: str | None, optional, default=None

The job ID to use, or None to generate a new one.

config_file: str | None, optional, default=None

The config file used to run the job, if any.

expiry_seconds: int, optional, default=3600

The number of seconds after which the job should be considered expired. Expired jobs are eligible for cleanup, but are not deleted immediately.

sync_timeout: int, optional, default=0

If greater than 0, wait for the job to complete for up to this many seconds. If the job does not complete in this time, return immediately with the job ID and no job info. If the job completes in this time, return the job ID and the job info. If 0, return immediately with the job ID and no job info.

job_fn: Callable[…, typing.Any]

The function to run as the job. This function must be serializable by Dask.

job_args: list[typing.Any]

The arguments to pass to the job function. These must be serializable by Dask.

job_kwargs: dict[str, typing.Any]

The keyword arguments to pass to the job function. These must be serializable by Dask

async update_status(
job_id: str,
status: str | JobStatus,
error: str | None = None,
output_path: str | None = None,
output: pydantic.BaseModel | None = None,
)#

Update the status and metadata of an existing job.

Parameters#

job_idstr

The unique identifier of the job to update.

statusstr | JobStatus

The new status to set for the job (should be a valid JobStatus value).

errorstr, optional, default=None

Error message to store if the job failed.

output_pathstr, optional, default=None

Path where job outputs are stored.

outputBaseModel, optional, default=None

Job output data. Can be a Pydantic BaseModel, dict, list, or string. BaseModel and dict/list objects are serialized to JSON for storage.

Raises#

ValueError

If the specified job_id does not exist in the job store.

async get_all_jobs() list[JobInfo]#

Retrieve all jobs from the job store.

Returns#

list[JobInfo]

A list of all JobInfo objects in the database. This operation can be expensive if there are many jobs stored.

Warning#

This method loads all jobs into memory and should be used with caution in production environments with large job stores.

async get_job(job_id: str) JobInfo | None#

Retrieve a specific job by its unique identifier.

Parameters#

job_idstr

The unique identifier of the job to retrieve.

Returns#

JobInfo or None

The JobInfo object if found, None if the job_id does not exist.

async get_status(job_id: str) JobStatus#

Get the current status of a specific job.

Parameters#

job_idstr

The unique identifier of the job.

Returns#

JobStatus

The current status of the job, or JobStatus.NOT_FOUND if the job does not exist in the store.

async get_last_job() JobInfo | None#

Retrieve the most recently created job.

Returns#

JobInfo or None

The JobInfo object for the most recently created job based on the created_at timestamp, or None if no jobs exist in the store.

async get_jobs_by_status(status: str | JobStatus) list[JobInfo]#

Retrieve all jobs that have a specific status.

Parameters#

statusstr | JobStatus

The status to filter jobs by.

Returns#

list[JobInfo]

A list of JobInfo objects that have the specified status. Returns an empty list if no jobs match the status.

get_expires_at(job: JobInfo) datetime.datetime | None#

Calculate the expiration time for a given job.

Active jobs (with status in self.ACTIVE_STATUS) do not expire and return None. For non-active jobs, the expiration time is calculated as updated_at + expiry_seconds.

Parameters#

jobJobInfo

The job object to calculate expiration time for.

Returns#

datetime or None

The UTC datetime when the job will expire, or None if the job is active and therefore exempt from expiration.

async cleanup_expired_jobs()#

Cleanup expired jobs, keeping the most recent one.

Updated_at is used instead of created_at to determine the most recent job. This is because jobs may not be processed in the order they are created.

get_db_engine(
db_url: str | None = None,
echo: bool = False,
use_async: bool = True,
) Engine | AsyncEngine#

Create a SQLAlchemy database engine, this should only be run once per process

Parameters#

db_url: str | None, optional, default=None

The database URL to connect to. Refer to https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls

echo: bool, optional, default=False

If True, SQLAlchemy will log all SQL statements. Useful for debugging.

use_async: bool, optional, default=True

If True, use the async database engine. The JobStore class requires an async database engine, setting use_async to False is only useful for testing.