Source code for nv_ingest_client.primitives.jobs.job_state

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


import logging
from concurrent.futures import Future
from enum import Enum
from enum import auto
from typing import Dict
from typing import Optional
from typing import Union
from uuid import UUID

from .job_spec import JobSpec

logger = logging.getLogger(__name__)


[docs] class JobStateEnum(Enum): """ Enumeration of possible states for a job in the NvIngestClient system. """ PENDING = auto() # Job has been created but not yet submitted or processed. SUBMITTED_ASYNC = auto() # Job has been submitted to the queue asynchronously. SUBMITTED = auto() # Job has been submitted to the queue. PROCESSING = auto() # Job is currently being processed. COMPLETED = auto() # Job has completed processing successfully. FAILED = auto() # Job has failed during processing. CANCELLED = auto() # Job has been cancelled before completion.
_TERMINAL_STATES = {JobStateEnum.COMPLETED, JobStateEnum.FAILED, JobStateEnum.CANCELLED} _PREFLIGHT_STATES = {JobStateEnum.PENDING, JobStateEnum.SUBMITTED_ASYNC}
[docs] class JobState: """ Encapsulates the state information for a job managed by the NvIngestClient. Attributes ---------- job_spec: JobSpec The unique identifier for the job. state : str The current state of the job. future : Future, optional The future object associated with the job's asynchronous operation. response : Dict, optional The response data received for the job. response_channel : str, optional The channel through which responses for the job are received. Methods ------- __init__(self, job_id: str, state: str, future: Optional[Future] = None, response: Optional[Dict] = None, response_channel: Optional[str] = None) Initializes a new instance of JobState. """ def __init__( self, job_spec: JobSpec, state: JobStateEnum = JobStateEnum.PENDING, future: Optional[Future] = None, response: Optional[Dict] = None, response_channel: Optional[str] = None, trace_id: Optional[str] = None, ) -> None: self._job_spec = job_spec self._state = state self._future = future self._response = response # TODO(Devin): Not currently used self._response_channel = response_channel self._trace_id = trace_id self._telemetry = {} @property def job_spec(self) -> JobSpec: """Gets the job specification associated with the state.""" return self._job_spec @job_spec.setter def job_spec(self, value: JobSpec) -> None: """Sets the job specification associated with the state.""" if self._state not in _PREFLIGHT_STATES: err_msg = f"Attempt to change job_spec after job submission: {self._state.name}" logger.error(err_msg) raise ValueError(err_msg) self._job_spec = value @property def job_id(self) -> Union[UUID, str]: """Gets the job's unique identifier.""" return self._job_spec.job_id @job_id.setter def job_id(self, value: str) -> None: """Sets the job's unique identifier, with constraints.""" self._job_spec.job_id = value @property def state(self) -> JobStateEnum: """Gets the current state of the job.""" return self._state @state.setter def state(self, value: JobStateEnum) -> None: """Sets the current state of the job with transition constraints.""" if self._state in _TERMINAL_STATES: logger.error(f"Attempt to change state from {self._state.name} to {value.name} denied.") raise ValueError(f"Cannot change state from {self._state.name} to {value.name}.") if value.value < self._state.value: logger.error(f"Invalid state transition attempt from {self._state.name} to {value.name}.") raise ValueError(f"State can only transition forward, from {self._state.name} to {value.name} not allowed.") self._state = value @property def future(self) -> Optional[Future]: """Gets the future object associated with the job's asynchronous operation.""" return self._future @future.setter def future(self, value: Future) -> None: """Sets the future object associated with the job's asynchronous operation, with constraints.""" self._future = value # TODO(Devin): Not convinced we need 'response' probably remove. @property def response(self) -> Optional[Dict]: """Gets the response data received for the job.""" return self._response @response.setter def response(self, value: Dict) -> None: """Sets the response data received for the job, with constraints.""" self._response = value @property def trace_id(self) -> Optional[str]: """Gets the trace_id from the job submission""" return self._trace_id @trace_id.setter def trace_id(self, value: str) -> None: """Sets the trace_id that was received from the submission to the REST endpoint""" self._trace_id = value