aiq.observability.async_otel_listener#
Attributes#
Classes#
A separate, async class that listens to the AIQ Toolkit intermediate step |
Functions#
|
Convert AIQ Toolkit's float |
Module Contents#
- WEAVE_AVAILABLE = True#
- logger#
- OPENINFERENCE_SPAN_KIND = 'openinference.span.kind'#
- opentelemetry#
- _ns_timestamp(seconds_float: float) int #
Convert AIQ Toolkit’s float
event_timestamp
(in seconds) into an integer number of nanoseconds, as OpenTelemetry expects.
- class AsyncOtelSpanListener(
- context_state: aiq.builder.context.AIQContextState | None = None,
A separate, async class that listens to the AIQ Toolkit intermediate step event stream and creates proper Otel spans:
On FUNCTION_START => open a new top-level span
On any other intermediate step => open a child subspan (immediate open/close)
On FUNCTION_END => close the function’s top-level span
This runs fully independently from the normal AIQ Toolkit workflow, so that the workflow is not blocking or entangled by OTel calls.
- Parameters:
context_state – Optionally supply a specific AIQContextState. If None, uses the global singleton.
- _context_state#
- _subscription = None#
- _running = False#
- _tracer#
- gc = None#
- _weave_calls#
- async start()#
Usage:
otel_listener = AsyncOtelSpanListener() async with otel_listener.start(): # run your AIQ Toolkit workflow ... # cleans up
This sets up the subscription to the AIQ Toolkit event stream and starts the background loop.
- async _cleanup()#
Close any remaining open spans.
- _serialize_payload(input_value: Any) tuple[str, bool] #
Serialize the input value to a string. Returns a tuple with the serialized value and a boolean indicating if the serialization is JSON or a string
- _process_start_event( )#
- _process_end_event( )#
- parent_call(trace_id: str, parent_call_id: str)#
Context manager to set a parent call context for Weave. This allows connecting AIQ spans to existing traces from other frameworks.
- _create_weave_call(
- step: aiq.data_models.intermediate_step.IntermediateStep,
- span: opentelemetry.trace.Span,
Create a Weave call directly from the span and step data, connecting to existing framework traces if available.