nat.plugins.weave.weave_exporter#

Attributes#

Classes#

WeaveExporter

A Weave exporter that exports telemetry traces to Weights & Biases Weave using OpenTelemetry.

Module Contents#

logger#
presidio_filter#
class WeaveExporter(
context_state=None,
entity: str | None = None,
project: str | None = None,
verbose: bool = False,
attributes: dict[str, Any] | None = None,
)#

Bases: nat.observability.exporter.span_exporter.SpanExporter[nat.data_models.span.Span, nat.data_models.span.Span]

A Weave exporter that exports telemetry traces to Weights & Biases Weave using OpenTelemetry.

Initialize the processing exporter.

Args:

context_state (ContextState | None): The context state to use for the exporter. drop_nones (bool): Whether to drop items when processors return None (default: True).

_weave_calls: nat.observability.exporter.base_exporter.IsolatedAttribute[dict[str, weave.trace.weave_client.Call]]#
_entity = None#
_project = None#
_attributes#
_gc#
async export_processed(
item: nat.data_models.span.Span | list[nat.data_models.span.Span],
) None#

Dummy implementation of export_processed.

Args:

item (Span | list[Span]): The span or list of spans to export.

_process_start_event(
event: nat.data_models.intermediate_step.IntermediateStep,
)#

Process the start event for a Weave call.

Args:

event (IntermediateStep): The intermediate step event.

_process_end_event(
event: nat.data_models.intermediate_step.IntermediateStep,
)#

Process the end event for a Weave call.

Args:

event (IntermediateStep): The intermediate step event.

parent_call(
trace_id: str,
parent_call_id: str,
) collections.abc.Generator[None]#

Create a dummy Weave call for the parent span.

Args:

trace_id (str): The trace ID of the parent span. parent_call_id (str): The ID of the parent call.

Yields:

None: The dummy Weave call.

_create_weave_call(
step: nat.data_models.intermediate_step.IntermediateStep,
span: nat.data_models.span.Span,
) weave.trace.weave_client.Call#

Create a Weave call directly from the span and step data, connecting to existing framework traces if available.

Args:

step (IntermediateStep): The intermediate step event. span (Span): The span associated with the intermediate step.

Returns:

Call: The Weave call created from the span and step data.

_extract_input_message(
input_data: Any,
inputs: dict[str, Any],
) None#

Extract message content from input data and add to inputs dictionary. Also handles websocket mode where message is located at messages[0].content[0].text.

Args:

input_data: The raw input data from the request inputs: Dictionary to populate with extracted message content

_extract_output_message(
output_data: Any,
outputs: dict[str, Any],
) None#

Extract message content from various response formats and add a preview to the outputs dictionary. No data is added to the outputs dictionary if the output format is not supported.

Supported output formats for message content include:

  • output.choices[0].message.content /chat endpoint

  • output.value /generate endpoint

  • output[0].choices[0].message.content chat WS schema

  • output[0].choices[0].delta.content chat_stream WS schema, /chat/stream endpoint

  • output[0].value generate & generate_stream WS schema, /generate/stream endpoint

Args:

output_data: The raw output data from the response outputs: Dictionary to populate with extracted message content.

_finish_weave_call(
step: nat.data_models.intermediate_step.IntermediateStep,
) None#

Finish a previously created Weave call.

Args:

step (IntermediateStep): The intermediate step event.

async _cleanup_weave_calls() None#

Clean up any lingering Weave calls.

async _cleanup() None#

Perform cleanup once the exporter is stopped.