aiq.profiler.callbacks.semantic_kernel_callback_handler#

Attributes#

Classes#

SemanticKernelPatchMethod

Stores the module and function to patch in Semantic Kernel.

SemanticKernelProfilerHandler

A callback manager/handler for Msft Semantic Kernel that intercepts calls to:

Module Contents#

logger#
class SemanticKernelPatchMethod(/, **data: Any)#

Bases: pydantic.BaseModel

Stores the module and function to patch in Semantic Kernel.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

module: Any = None#
function: str = None#
class SemanticKernelProfilerHandler(workflow_llms: dict)#

Bases: aiq.profiler.callbacks.base_callback_class.BaseProfilerCallback

A callback manager/handler for Msft Semantic Kernel that intercepts calls to:

  • Chat Completions Endpoints

  • Tool calls

to collect usage statistics (tokens, inputs, outputs, time intervals, etc.) and store them in AgentIQ’s usage_stats queue for subsequent analysis.

_lock#
last_call_ts#
step_manager#
_builder_llms#
_original_tool_call = None#
_patch_methods#
instrument() None#

Monkey-patch the relevant Semantic Kernel methods with usage-stat collection logic.

_build_llm_call_patch(
original_func: collections.abc.Callable[Ellipsis, Any],
) collections.abc.Callable[Ellipsis, Any]#

Returns an async monkey-patch that wraps the original chat-completion method. Replicates the usage collection from _llm_call_wrapper.

_build_tool_call_patch(
original_func: collections.abc.Callable[Ellipsis, Any],
) collections.abc.Callable[Ellipsis, Any]#

Returns an async monkey-patch that wraps the original tool call (invoke_function_call). Replicates usage collection from _tool_use_wrapper.