nat.builder.intermediate_step_manager#
Attributes#
Classes#
Manages updates to the NAT Event Stream for intermediate steps |
Module Contents#
- logger#
- class OpenStep#
- class IntermediateStepManager(
- context_state: nat.builder.context.ContextState,
Manages updates to the NAT Event Stream for intermediate steps
- _active_instances: ClassVar[set[weakref.ref]]#
- _context_state#
- push_intermediate_steps( ) None#
Inject a sequence of intermediate steps into the event stream without updating the step manager’s internal stack. Used to replay steps from a remote workflow (for example, from a /generate/full response) into the current workflow’s observability stream so the full tree is visible.
When replaying steps from a remote workflow, ensure the remote was invoked with the appropriate workflow-parent-id and workflow-parent-name (HTTP headers or session.run(parent_id=…, parent_name=…)) so the root step in the replayed list has the correct parent in the trace tree.
Parameters#
- stepslist[IntermediateStep]
Steps to inject (for example, parsed from a remote workflow response).
- subscribe(
- on_next: nat.utils.reactive.observable.OnNext[nat.data_models.intermediate_step.IntermediateStep],
- on_error: nat.utils.reactive.observable.OnError = None,
- on_complete: nat.utils.reactive.observable.OnComplete = None,
Subscribes to the NAT Event Stream for intermediate steps
- classmethod _cleanup_instance_tracking(ref: weakref.ref) None#
Cleanup callback for weakref when instance is garbage collected.