nat.eval.evaluate#

Attributes#

Classes#

EvaluationRun

Instantiated for each evaluation run and used to store data for that single run.

Module Contents#

logger#
class EvaluationRun(config: nat.eval.config.EvaluationRunConfig)#

Instantiated for each evaluation run and used to store data for that single run.

Warning

Experimental Feature: The Evaluation API is experimental and may change in future releases. Future versions may introduce breaking changes without notice.

Initialize an EvaluationRun with configuration.

config: nat.eval.config.EvaluationRunConfig#
eval_config: nat.data_models.evaluate.EvalConfig | None = None#
intermediate_step_adapter: nat.eval.intermediate_step_adapter.IntermediateStepAdapter#
weave_eval: nat.eval.utils.weave_eval.WeaveEvaluationIntegration#
eval_input: nat.eval.evaluator.evaluator_model.EvalInput | None = None#
workflow_interrupted: bool = False#
evaluation_results: list[tuple[str, nat.eval.evaluator.evaluator_model.EvalOutput]] = []#
usage_stats: nat.eval.usage_stats.UsageStats#
workflow_output_file: pathlib.Path | None = None#
evaluator_output_files: list[pathlib.Path] = []#
_compute_usage_stats(
item: nat.eval.evaluator.evaluator_model.EvalInputItem,
)#

Compute usage stats for a single item using the intermediate steps

async run_workflow_local(
session_manager: nat.runtime.session.SessionManager,
)#

Launch the workflow with the specified questions and extract the output using the jsonpath

async run_workflow_remote()#
async profile_workflow() nat.profiler.data_models.ProfilerResults#

Profile a dataset

cleanup_output_directory()#

Remove contents of the output directory if it exists

write_output(
dataset_handler: nat.eval.dataset_handler.dataset_handler.DatasetHandler,
profiler_results: nat.profiler.data_models.ProfilerResults,
)#
publish_output(
dataset_handler: nat.eval.dataset_handler.dataset_handler.DatasetHandler,
profiler_results: nat.profiler.data_models.ProfilerResults,
)#

Publish the output

async run_single_evaluator(evaluator_name: str, evaluator: Any)#

Run a single evaluator and store its results.

async run_evaluators(evaluators: dict[str, Any])#

Run all configured evaluators asynchronously.

apply_overrides()#
_get_workflow_alias(workflow_type: str | None = None)#

Get the workflow alias for displaying in evaluation UI.

async wait_for_all_export_tasks_local(
session_manager: nat.runtime.session.SessionManager,
timeout: float,
) None#

Wait for all trace export tasks to complete for local workflows.

This only works for local workflows where we have direct access to the SessionManager and its underlying workflow with exporter manager.

async run_and_evaluate(
session_manager: nat.runtime.session.SessionManager | None = None,
job_id: str | None = None,
) nat.eval.config.EvaluationRunOutput#

Run the workflow with the specified config file and evaluate the dataset