aiq.observability.async_otel_listener#

Attributes#

Classes#

AsyncOtelSpanListener

A separate, async class that listens to the AIQ Toolkit intermediate step

Functions#

_ns_timestamp(→ int)

Convert AIQ Toolkit's float event_timestamp (in seconds) into an integer number

Module Contents#

WEAVE_AVAILABLE = True#
logger#
OPENINFERENCE_SPAN_KIND = 'openinference.span.kind'#
opentelemetry#
_ns_timestamp(seconds_float: float) int#

Convert AIQ Toolkit’s float event_timestamp (in seconds) into an integer number of nanoseconds, as OpenTelemetry expects.

class AsyncOtelSpanListener(
context_state: aiq.builder.context.AIQContextState | None = None,
)#

A separate, async class that listens to the AIQ Toolkit intermediate step event stream and creates proper Otel spans:

  • On FUNCTION_START => open a new top-level span

  • On any other intermediate step => open a child subspan (immediate open/close)

  • On FUNCTION_END => close the function’s top-level span

This runs fully independently from the normal AIQ Toolkit workflow, so that the workflow is not blocking or entangled by OTel calls.

Parameters:

context_state – Optionally supply a specific AIQContextState. If None, uses the global singleton.

_context_state#
_subscription = None#
_outstanding_spans: dict[str, opentelemetry.trace.Span]#
_span_stack: dict[str, opentelemetry.trace.Span]#
_running = False#
_tracer#
gc = None#
_weave_calls#
_on_next(
step: aiq.data_models.intermediate_step.IntermediateStep,
) None#

The main logic that reacts to each IntermediateStep.

_on_error(exc: Exception) None#
_on_complete() None#
async start()#

Usage:

otel_listener = AsyncOtelSpanListener()
async with otel_listener.start():
    # run your AIQ Toolkit workflow
    ...
# cleans up

This sets up the subscription to the AIQ Toolkit event stream and starts the background loop.

async _cleanup()#

Close any remaining open spans.

_serialize_payload(input_value: Any) tuple[str, bool]#

Serialize the input value to a string. Returns a tuple with the serialized value and a boolean indicating if the serialization is JSON or a string

_process_start_event(
step: aiq.data_models.intermediate_step.IntermediateStep,
)#
_process_end_event(
step: aiq.data_models.intermediate_step.IntermediateStep,
)#
parent_call(trace_id: str, parent_call_id: str)#

Context manager to set a parent call context for Weave. This allows connecting AIQ spans to existing traces from other frameworks.

_create_weave_call(
step: aiq.data_models.intermediate_step.IntermediateStep,
span: opentelemetry.trace.Span,
) None#

Create a Weave call directly from the span and step data, connecting to existing framework traces if available.

_finish_weave_call(
step: aiq.data_models.intermediate_step.IntermediateStep,
) None#

Finish a previously created Weave call