nat.builder.intermediate_step_manager#

Attributes#

Classes#

OpenStep

IntermediateStepManager

Manages updates to the NAT Event Stream for intermediate steps

Module Contents#

logger#
class OpenStep#
step_id: str#
step_name: str#
step_type: str#
step_parent_id: str#
prev_stack: list[str]#
active_stack: list[str]#
class IntermediateStepManager(
context_state: nat.builder.context.ContextState,
)#

Manages updates to the NAT Event Stream for intermediate steps

_instance_count: ClassVar[int] = 0#
_active_instances: ClassVar[set[weakref.ref]]#
_context_state#
_outstanding_start_steps: dict[str, OpenStep]#
push_intermediate_step(
payload: nat.data_models.intermediate_step.IntermediateStepPayload,
) None#

Pushes an intermediate step to the NAT Event Stream

subscribe(
on_next: nat.utils.reactive.observable.OnNext[nat.data_models.intermediate_step.IntermediateStep],
on_error: nat.utils.reactive.observable.OnError = None,
on_complete: nat.utils.reactive.observable.OnComplete = None,
) nat.utils.reactive.subscription.Subscription#

Subscribes to the NAT Event Stream for intermediate steps

classmethod _cleanup_instance_tracking(ref: weakref.ref) None#

Cleanup callback for weakref when instance is garbage collected.

classmethod get_active_instance_count() int#

Get the number of active IntermediateStepManager instances.

Returns:

int: Number of active instances (cleaned up automatically via weakref)

get_outstanding_step_count() int#

Get the number of outstanding (started but not ended) steps.

Returns:

int: Number of steps that have been started but not yet ended