aiq.front_ends.fastapi.job_store#

Attributes#

Classes#

JobStatus

str(object='') -> str

JobInfo

JobStore

Module Contents#

logger#
class JobStatus#

Bases: str, enum.Enum

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

Initialize self. See help(type(self)) for accurate signature.

SUBMITTED = 'submitted'#
RUNNING = 'running'#
SUCCESS = 'success'#
FAILURE = 'failure'#
INTERRUPTED = 'interrupted'#
NOT_FOUND = 'not_found'#
class JobInfo(/, **data: Any)#

Bases: pydantic.BaseModel

job_id: str#
status: JobStatus#
config_file: str#
error: str | None#
output_path: str | None#
created_at: datetime.datetime#
updated_at: datetime.datetime#
expiry_seconds: int#
class JobStore#
MIN_EXPIRY = 600#
MAX_EXPIRY = 86400#
DEFAULT_EXPIRY = 3600#
ACTIVE_STATUS#
_jobs#
create_job(
config_file: str,
job_id: str | None = None,
expiry_seconds: int = DEFAULT_EXPIRY,
) str#
update_status(
job_id: str,
status: str,
error: str | None = None,
output_path: str | None = None,
)#
get_status(job_id: str) JobInfo | None#
list_jobs()#
get_job(job_id: str) JobInfo | None#

Get a job by its ID.

get_last_job() JobInfo | None#

Get the last created job.

get_jobs_by_status(status: str) list[JobInfo]#

Get all jobs with the specified status.

get_all_jobs() list[JobInfo]#

Get all jobs in the store.

get_expires_at(job: JobInfo) datetime.datetime | None#

Get the time for a job to expire.

cleanup_expired_jobs()#

Cleanup expired jobs, keeping the most recent one. Updated_at is used instead of created_at to determine the most recent job. This is because jobs may not be processed in the order they are created.