aiq.observability.async_otel_listener#

Attributes#

Classes#

AsyncOtelSpanListener

A separate, async class that listens to the AgentIQ intermediate step

Functions#

_ns_timestamp(→ int)

Convert AgentIQ’s float event_timestamp (in seconds) into an integer number

Module Contents#

logger#
OPENINFERENCE_SPAN_KIND = 'openinference.span.kind'#
_ns_timestamp(seconds_float: float) int#

Convert AgentIQ’s float event_timestamp (in seconds) into an integer number of nanoseconds, as OpenTelemetry expects.

class AsyncOtelSpanListener(
context_state: aiq.builder.context.AIQContextState | None = None,
)#

A separate, async class that listens to the AgentIQ intermediate step event stream and creates proper Otel spans:

  • On FUNCTION_START => open a new top-level span

  • On any other intermediate step => open a child subspan (immediate open/close)

  • On FUNCTION_END => close the function’s top-level span

This runs fully independently from the normal AgentIQ workflow, so that the workflow is not blocking or entangled by OTel calls.

Parameters:

context_state – Optionally supply a specific AIQContextState. If None, uses the global singleton.

_context_state#
_subscription = None#
_outstanding_spans: dict[str, opentelemetry.trace.Span]#
_span_stack: list[opentelemetry.trace.Span] = []#
_running = False#
_tracer#
_on_next(
step: aiq.data_models.intermediate_step.IntermediateStep,
) None#

The main logic that reacts to each IntermediateStep.

_on_error(exc: Exception) None#
_on_complete() None#
async start()#

Usage:

otel_listener = AsyncOtelSpanListener()
async with otel_listener.start():
    # run your AgentIQ workflow
    ...
# cleans up

This sets up the subscription to the AgentIQ event stream and starts the background loop.

async _cleanup()#

Close any remaining open spans.

_serialize_payload(input_value: Any) tuple[str, bool]#

Serialize the input value to a string. Returns a tuple with the serialized value and a boolean indicating if the serialization is JSON or a string

_process_start_event(
step: aiq.data_models.intermediate_step.IntermediateStep,
)#
_process_end_event(
step: aiq.data_models.intermediate_step.IntermediateStep,
)#