aiq.observability.async_otel_listener#
Attributes#
Classes#
A separate, async class that listens to the AgentIQ intermediate step |
Functions#
|
Convert AgentIQ’s float |
Module Contents#
- logger#
- OPENINFERENCE_SPAN_KIND = 'openinference.span.kind'#
- _ns_timestamp(seconds_float: float) int #
Convert AgentIQ’s float
event_timestamp
(in seconds) into an integer number of nanoseconds, as OpenTelemetry expects.
- class AsyncOtelSpanListener(
- context_state: aiq.builder.context.AIQContextState | None = None,
A separate, async class that listens to the AgentIQ intermediate step event stream and creates proper Otel spans:
On FUNCTION_START => open a new top-level span
On any other intermediate step => open a child subspan (immediate open/close)
On FUNCTION_END => close the function’s top-level span
This runs fully independently from the normal AgentIQ workflow, so that the workflow is not blocking or entangled by OTel calls.
- Parameters:
context_state – Optionally supply a specific AIQContextState. If None, uses the global singleton.
- _context_state#
- _subscription = None#
- _running = False#
- _tracer#
- async start()#
Usage:
otel_listener = AsyncOtelSpanListener() async with otel_listener.start(): # run your AgentIQ workflow ... # cleans up
This sets up the subscription to the AgentIQ event stream and starts the background loop.
- async _cleanup()#
Close any remaining open spans.
- _serialize_payload(input_value: Any) tuple[str, bool] #
Serialize the input value to a string. Returns a tuple with the serialized value and a boolean indicating if the serialization is JSON or a string
- _process_start_event( )#
- _process_end_event( )#