aiq.builder.intermediate_step_manager#

Attributes#

Classes#

OpenStep

IntermediateStepManager

Manages updates to the AgentIQ Event Stream for intermediate steps

Module Contents#

logger#
_current_open_step_id#
class OpenStep#
step_id: str#
step_name: str#
step_type: str#
step_parent_id: str | None#
context: contextvars.Context#
token: contextvars.Token[str | None]#
class IntermediateStepManager(context_state: AIQContextState)#

Manages updates to the AgentIQ Event Stream for intermediate steps

_context_state#
_outstanding_start_steps: dict[str, OpenStep]#
push_intermediate_step(
payload: aiq.data_models.intermediate_step.IntermediateStepPayload,
) None#

Pushes an intermediate step to the AgentIQ Event Stream

subscribe(
on_next: aiq.utils.reactive.observable.OnNext[aiq.data_models.intermediate_step.IntermediateStep],
on_error: aiq.utils.reactive.observable.OnError = None,
on_complete: aiq.utils.reactive.observable.OnComplete = None,
) aiq.utils.reactive.subscription.Subscription#

Subscribes to the AgentIQ Event Stream for intermediate steps