nat.front_ends.fastapi.job_store#
Classes#
Functions#
|
Create a SQLAlchemy database engine, this should only be run once per process |
Module Contents#
- class JobStatus#
-
Enumeration of possible job statuses in the job store.
Attributes#
- SUBMITTEDstr
Job has been submitted to the scheduler but not yet started.
- RUNNINGstr
Job is currently being executed.
- SUCCESSstr
Job completed successfully.
- FAILUREstr
Job failed during execution.
- INTERRUPTEDstr
Job was interrupted or cancelled before completion.
- NOT_FOUNDstr
Job ID does not exist in the job store.
Initialize self. See help(type(self)) for accurate signature.
- SUBMITTED = 'submitted'#
- RUNNING = 'running'#
- SUCCESS = 'success'#
- FAILURE = 'failure'#
- INTERRUPTED = 'interrupted'#
- NOT_FOUND = 'not_found'#
- class JobInfo#
Bases:
BaseSQLAlchemy model representing job metadata and status information.
This model stores comprehensive information about jobs submitted to the Dask scheduler, including their current status, configuration, outputs, and lifecycle metadata.
Attributes#
- job_idstr
Unique identifier for the job (primary key).
- statusJobStatus
Current status of the job.
- config_filestr, optional
Path to the configuration file used for the job.
- errorstr, optional
Error message if the job failed.
- output_pathstr, optional
Path where job outputs are stored.
- created_atdatetime
Timestamp when the job was created.
- updated_atdatetime
Timestamp when the job was last updated.
- expiry_secondsint
Number of seconds after which the job is eligible for cleanup.
- outputstr, optional
Serialized job output data (JSON format).
- is_expiredbool
Flag indicating if the job has been marked as expired.
- created_at: sqlalchemy.orm.Mapped[datetime.datetime]#
- updated_at: sqlalchemy.orm.Mapped[datetime.datetime]#
- class JobStore( )#
Bases:
nat.front_ends.fastapi.dask_client_mixin.DaskClientMixinTracks and manages jobs submitted to the Dask scheduler, along with persisting job metadata (JobInfo objects) in a database.
Parameters#
- scheduler_address: str
The address of the Dask scheduler.
- db_engine: AsyncEngine | None, optional, default=None
The database engine for the job store.
- db_url: str | None, optional, default=None
The database URL to connect to, used when db_engine is not provided. Refer to: https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls
- MIN_EXPIRY = 600#
- MAX_EXPIRY = 86400#
- DEFAULT_EXPIRY = 3600#
- ACTIVE_STATUS#
- _scheduler_address#
- _session#
- async client() collections.abc.AsyncGenerator[dask.distributed.Client]#
Async context manager for obtaining a Dask client connection.
Yields#
- DaskClient
An active Dask client connected to the scheduler. The client is automatically closed when exiting the context manager.
- async session() collections.abc.AsyncGenerator[sqlalchemy.ext.asyncio.AsyncSession]#
Async context manager for a SQLAlchemy session with automatic transaction management.
Creates a new database session scoped to the current async task and begins a transaction. The transaction is committed on successful exit and rolled back on exception. The session is automatically removed from the registry after use.
Yields#
- AsyncSession
An active SQLAlchemy async session with an open transaction.
- ensure_job_id(job_id: str | None) str#
Ensure a job ID is provided, generating a new one if necessary.
If a job ID is provided, it is returned as-is.
Parameters#
- job_id: str | None
The job ID to ensure, or None to generate a new one.
- async _create_job( ) str#
Create a job and add it to the job store. This should not be called directly, but instead be called by
submit_job
- async submit_job(
- *,
- job_id: str | None = None,
- config_file: str | None = None,
- expiry_seconds: int = DEFAULT_EXPIRY,
- sync_timeout: int = 0,
- job_fn: collections.abc.Callable[Ellipsis, Any],
- job_args: list[Any],
- **job_kwargs,
Submit a job to the Dask scheduler, and store job metadata in the database.
Parameters#
- job_id: str | None, optional, default=None
The job ID to use, or None to generate a new one.
- config_file: str | None, optional, default=None
The config file used to run the job, if any.
- expiry_seconds: int, optional, default=3600
The number of seconds after which the job should be considered expired. Expired jobs are eligible for cleanup, but are not deleted immediately.
- sync_timeout: int, optional, default=0
If greater than 0, wait for the job to complete for up to this many seconds. If the job does not complete in this time, return immediately with the job ID and no job info. If the job completes in this time, return the job ID and the job info. If 0, return immediately with the job ID and no job info.
- job_fn: Callable[…, typing.Any]
The function to run as the job. This function must be serializable by Dask.
- job_args: list[typing.Any]
The arguments to pass to the job function. These must be serializable by Dask.
- job_kwargs: dict[str, typing.Any]
The keyword arguments to pass to the job function. These must be serializable by Dask
- async update_status(
- job_id: str,
- status: str | JobStatus,
- error: str | None = None,
- output_path: str | None = None,
- output: pydantic.BaseModel | None = None,
Update the status and metadata of an existing job.
Parameters#
- job_idstr
The unique identifier of the job to update.
- statusstr | JobStatus
The new status to set for the job (should be a valid JobStatus value).
- errorstr, optional, default=None
Error message to store if the job failed.
- output_pathstr, optional, default=None
Path where job outputs are stored.
- outputBaseModel, optional, default=None
Job output data. Can be a Pydantic BaseModel, dict, list, or string. BaseModel and dict/list objects are serialized to JSON for storage.
Raises#
- ValueError
If the specified job_id does not exist in the job store.
- async get_all_jobs() list[JobInfo]#
Retrieve all jobs from the job store.
Returns#
- list[JobInfo]
A list of all JobInfo objects in the database. This operation can be expensive if there are many jobs stored.
Warning#
This method loads all jobs into memory and should be used with caution in production environments with large job stores.
- async get_job(job_id: str) JobInfo | None#
Retrieve a specific job by its unique identifier.
Parameters#
- job_idstr
The unique identifier of the job to retrieve.
Returns#
- JobInfo or None
The JobInfo object if found, None if the job_id does not exist.
- async get_status(job_id: str) JobStatus#
Get the current status of a specific job.
Parameters#
- job_idstr
The unique identifier of the job.
Returns#
- JobStatus
The current status of the job, or JobStatus.NOT_FOUND if the job does not exist in the store.
- async get_last_job() JobInfo | None#
Retrieve the most recently created job.
Returns#
- JobInfo or None
The JobInfo object for the most recently created job based on the created_at timestamp, or None if no jobs exist in the store.
- async get_jobs_by_status(status: str | JobStatus) list[JobInfo]#
Retrieve all jobs that have a specific status.
Parameters#
- statusstr | JobStatus
The status to filter jobs by.
Returns#
- list[JobInfo]
A list of JobInfo objects that have the specified status. Returns an empty list if no jobs match the status.
- get_expires_at(job: JobInfo) datetime.datetime | None#
Calculate the expiration time for a given job.
Active jobs (with status in
self.ACTIVE_STATUS) do not expire and returnNone. For non-active jobs, the expiration time is calculated as updated_at + expiry_seconds.Parameters#
- jobJobInfo
The job object to calculate expiration time for.
Returns#
- datetime or None
The UTC datetime when the job will expire, or None if the job is active and therefore exempt from expiration.
- async cleanup_expired_jobs()#
Cleanup expired jobs, keeping the most recent one.
Updated_at is used instead of created_at to determine the most recent job. This is because jobs may not be processed in the order they are created.
- get_db_engine( ) Engine | AsyncEngine#
Create a SQLAlchemy database engine, this should only be run once per process
Parameters#
- db_url: str | None, optional, default=None
The database URL to connect to. Refer to https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls
- echo: bool, optional, default=False
If True, SQLAlchemy will log all SQL statements. Useful for debugging.
- use_async: bool, optional, default=True
If True, use the async database engine. The JobStore class requires an async database engine, setting
use_asyncto False is only useful for testing.