aiq.builder.intermediate_step_manager#

Attributes#

Classes#

OpenStep

IntermediateStepManager

Manages updates to the AIQ Toolkit Event Stream for intermediate steps

Module Contents#

logger#
class OpenStep#
step_id: str#
step_name: str#
step_type: str#
step_parent_id: str | None#
prev_stack: list[str]#
active_stack: list[str]#
class IntermediateStepManager(
context_state: aiq.builder.context.AIQContextState,
)#

Manages updates to the AIQ Toolkit Event Stream for intermediate steps

_context_state#
_outstanding_start_steps: dict[str, OpenStep]#
push_intermediate_step(
payload: aiq.data_models.intermediate_step.IntermediateStepPayload,
) None#

Pushes an intermediate step to the AIQ Toolkit Event Stream

subscribe(
on_next: aiq.utils.reactive.observable.OnNext[aiq.data_models.intermediate_step.IntermediateStep],
on_error: aiq.utils.reactive.observable.OnError = None,
on_complete: aiq.utils.reactive.observable.OnComplete = None,
) aiq.utils.reactive.subscription.Subscription#

Subscribes to the AIQ Toolkit Event Stream for intermediate steps