diff --git a/docs/source/workflows/evaluate.md b/docs/source/workflows/evaluate.md index d5d24ddc21..de8727b48a 100644 --- a/docs/source/workflows/evaluate.md +++ b/docs/source/workflows/evaluate.md @@ -254,15 +254,16 @@ Here is a sample evaluator output generated by running evaluation on the simple The contents of the file have been `snipped` for brevity. ## Visualizing Evaluation Results -You can visualize the evaluation results using the Weights and Biases (W&B) Weave dashboard. +You can visualize the evaluation results using the Weights and Biases (W&B) Weave dashboard or Arize Phoenix. -### Step 1: Install the Weave plugin +### Weights and Biases (W&B) Weave +#### Step 1: Install the Weave plugin To install the Weave plugin, run: ```bash uv pip install -e '.[weave]' ``` -### Step 2: Enable logging to Weave in the configuration file +#### Step 2: Enable logging to Weave in the configuration file Edit your evaluation config, for example: `examples/evaluation_and_profiling/simple_web_query_eval/src/nat_simple_web_query_eval/configs/eval_config_llama31.yml`: ```yaml @@ -273,6 +274,28 @@ general: _type: weave project: "nat-simple" ``` +### Arize Phoenix +#### Exporting evaluation metrics to Arize Phoenix +If your workflow is already configured to export traces to Phoenix, the evaluation run will also export per-item and summary metrics to Phoenix. + +#### Step 1: Install the Phoenix plugin +```bash +uv pip install -e '.[phoenix]' +``` + +#### Step 2: Enable Phoenix tracing in your configuration +Add Phoenix under `general.telemetry.tracing` in your workflow config used for evaluation: +```yaml +general: + telemetry: + tracing: + phoenix: + _type: phoenix + endpoint: http://localhost:6006/v1/traces + project: nat-simple +``` + +With this tracing configuration, `nat eval` will attempt to log evaluator scores to Phoenix. Scores will be associated with traces by matching the recorded `input.value` of spans to the evaluation items. In the Phoenix UI, open your project and inspect evaluations/feedback linked to recent traces. When running experiments with different configurations, the `project` name should be the same to allow for comparison of runs. The `workflow_alias` can be configured to differentiate between runs with different configurations. For example to run two evaluations with different LLM models, you can configure the `workflow_alias` as follows: `examples/evaluation_and_profiling/simple_web_query_eval/src/nat_simple_web_query_eval/configs/eval_config_llama31.yml`: diff --git a/src/nat/eval/evaluate.py b/src/nat/eval/evaluate.py index 106739a4bf..b49b6037d4 100644 --- a/src/nat/eval/evaluate.py +++ b/src/nat/eval/evaluate.py @@ -73,6 +73,18 @@ def __init__(self, config: EvaluationRunConfig): self.eval_trace_context = EvalTraceContext() self.weave_eval: WeaveEvaluationIntegration = WeaveEvaluationIntegration(self.eval_trace_context) + + try: + from nat.eval.utils.eval_trace_ctx import EvalTraceContext + from nat.eval.utils.phoenix_eval import PhoenixEvaluationIntegration + # Phoenix doesn't need its specific trace context because phoenix annotates spans; no context API is used + self.phoenix_eval = PhoenixEvaluationIntegration(EvalTraceContext()) + except Exception: + self.phoenix_eval = None + + self._use_weave_eval: bool = False + self._use_phoenix_eval: bool = False + # Metadata self.eval_input: EvalInput | None = None self.workflow_interrupted: bool = False @@ -213,8 +225,12 @@ async def run_one(item: EvalInputItem): item.trajectory = self.intermediate_step_adapter.validate_intermediate_steps(intermediate_steps) usage_stats_item = self._compute_usage_stats(item) - self.weave_eval.log_prediction(item, output) - await self.weave_eval.log_usage_stats(item, usage_stats_item) + if self._use_weave_eval: + self.weave_eval.log_prediction(item, output) + await self.weave_eval.log_usage_stats(item, usage_stats_item) + if self._use_phoenix_eval and self.phoenix_eval: + self.phoenix_eval.log_prediction(item, output) + await self.phoenix_eval.log_usage_stats(item, usage_stats_item) async def wrapped_run(item: EvalInputItem) -> None: await run_one(item) @@ -238,8 +254,12 @@ async def run_workflow_remote(self): await handler.run_workflow_remote(self.eval_input) for item in self.eval_input.eval_input_items: usage_stats_item = self._compute_usage_stats(item) - self.weave_eval.log_prediction(item, item.output_obj) - await self.weave_eval.log_usage_stats(item, usage_stats_item) + if self._use_weave_eval: + self.weave_eval.log_prediction(item, item.output_obj) + await self.weave_eval.log_usage_stats(item, usage_stats_item) + if self._use_phoenix_eval and self.phoenix_eval: + self.phoenix_eval.log_prediction(item, item.output_obj) + await self.phoenix_eval.log_usage_stats(item, usage_stats_item) async def profile_workflow(self) -> ProfilerResults: """ @@ -357,7 +377,11 @@ def publish_output(self, dataset_handler: DatasetHandler, profiler_results: Prof "`eval` with the --skip_completed_entries flag.") logger.warning(msg) - self.weave_eval.log_summary(self.usage_stats, self.evaluation_results, profiler_results) + if self._use_weave_eval: + self.weave_eval.log_summary(self.usage_stats, self.evaluation_results, profiler_results) + # Export to Phoenix if selected + if self._use_phoenix_eval and self.phoenix_eval: + self.phoenix_eval.log_summary(self.usage_stats, self.evaluation_results, profiler_results) async def run_single_evaluator(self, evaluator_name: str, evaluator: Any): """Run a single evaluator and store its results.""" @@ -365,7 +389,10 @@ async def run_single_evaluator(self, evaluator_name: str, evaluator: Any): eval_output = await evaluator.evaluate_fn(self.eval_input) self.evaluation_results.append((evaluator_name, eval_output)) - await self.weave_eval.alog_score(eval_output, evaluator_name) + if self._use_weave_eval: + await self.weave_eval.alog_score(eval_output, evaluator_name) + if self._use_phoenix_eval and self.phoenix_eval: + await self.phoenix_eval.alog_score(eval_output, evaluator_name) except Exception as e: logger.exception("An error occurred while running evaluator %s: %s", evaluator_name, e) @@ -383,8 +410,11 @@ async def run_evaluators(self, evaluators: dict[str, Any]): logger.error("An error occurred while running evaluators: %s", e) raise finally: - # Finish prediction loggers in Weave - await self.weave_eval.afinish_loggers() + # Finish prediction loggers where enabled + if self._use_weave_eval: + await self.weave_eval.afinish_loggers() + if self._use_phoenix_eval and self.phoenix_eval: + await self.phoenix_eval.afinish_loggers() def apply_overrides(self): from nat.cli.cli_utils.config_override import load_and_override_config @@ -511,8 +541,16 @@ async def run_and_evaluate(self, # Run workflow and evaluate async with WorkflowEvalBuilder.from_config(config=config) as eval_workflow: - # Initialize Weave integration - self.weave_eval.initialize_logger(workflow_alias, self.eval_input, config) + # Decide which evaluation integrations to use based on tracing config + tracing_cfg = getattr(getattr(config.general, 'telemetry', None), 'tracing', None) + self._use_weave_eval = isinstance(tracing_cfg, dict) and ('weave' in tracing_cfg) + self._use_phoenix_eval = isinstance(tracing_cfg, dict) and ('phoenix' in tracing_cfg) + + # Initialize selected integrations + if self._use_weave_eval: + self.weave_eval.initialize_logger(workflow_alias, self.eval_input, config) + if self._use_phoenix_eval and self.phoenix_eval: + self.phoenix_eval.initialize_logger(workflow_alias, self.eval_input, config) with self.eval_trace_context.evaluation_context(): # Run workflow diff --git a/src/nat/eval/utils/phoenix_eval.py b/src/nat/eval/utils/phoenix_eval.py new file mode 100644 index 0000000000..3ddeefc23d --- /dev/null +++ b/src/nat/eval/utils/phoenix_eval.py @@ -0,0 +1,239 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +from datetime import datetime +from datetime import timedelta +from typing import TYPE_CHECKING +from typing import Any + +import httpx + +from nat.eval.evaluator.evaluator_model import EvalInput +from nat.eval.evaluator.evaluator_model import EvalInputItem +from nat.eval.evaluator.evaluator_model import EvalOutput +from nat.eval.usage_stats import UsageStats +from nat.profiler.data_models import ProfilerResults + +if TYPE_CHECKING: + from nat.eval.utils.eval_trace_ctx import EvalTraceContext + +logger = logging.getLogger(__name__) + + +class PhoenixEvaluationIntegration: + """ + Class to handle Arize Phoenix integration for evaluation metrics. + + This integration attempts best-effort logging of per-item evaluator scores + to Phoenix when Phoenix tracing is configured in the workflow config. + """ + + def __init__(self, eval_trace_context: "EvalTraceContext"): + self.available = False + self.client = None + self.project_name: str | None = None + self.eval_trace_context = eval_trace_context + self.run_name: str | None = None + # Minimal state to match Weave-level complexity + # Best-effort mapping from eval item id -> input string for span association + self._id_to_input: dict[str, str] = {} + + try: + from phoenix.client import Client as _PhoenixClient # noqa: F401 + self.available = True + except ImportError: + self.available = False + + def _extract_phoenix_server_url(self, endpoint: str) -> str: + """Convert OTLP traces endpoint to Phoenix server URL if needed. + + Example: http://localhost:6006/v1/traces -> http://localhost:6006 + """ + if not endpoint: + return endpoint + # strip trailing '/v1/traces' if present + suffix = "/v1/traces" + return endpoint[:-len(suffix)] if endpoint.endswith(suffix) else endpoint + + def _find_phoenix_config(self, config: Any) -> tuple[str | None, str | None]: + """Find Phoenix tracing config (endpoint, project) from full config object.""" + try: + cfg = config.model_dump(mode="json") + except AttributeError: + try: + # If already a dict + cfg = dict(config) # type: ignore[arg-type] + except (TypeError, ValueError): + return None, None + + tracing = (cfg.get("general", {}) or {}).get("telemetry", {}).get("tracing", {}) + phoenix_cfg = tracing.get("phoenix") or tracing.get("Phoenix") + if not isinstance(phoenix_cfg, dict): + return None, None + + endpoint = phoenix_cfg.get("endpoint") + project = phoenix_cfg.get("project") + return (endpoint, project) + + def _metric_eval_name(self, metric: str) -> str: + return f"{self.run_name}:{metric}" if self.run_name else metric + + def initialize_logger(self, _workflow_alias: str, _eval_input: EvalInput, config: Any) -> bool: + """Initialize Phoenix client if Phoenix tracing is configured.""" + if not self.available: + return False + + endpoint, project = self._find_phoenix_config(config) + if not endpoint or not project: + # Phoenix tracing not configured; skip + return False + + try: + from phoenix.client import Client as PhoenixClient + except ImportError as e: + logger.warning("Failed to import phoenix client: %s", e) + self.client = None + self.project_name = None + return False + + try: + server_url = self._extract_phoenix_server_url(endpoint) + self.client = PhoenixClient(base_url=server_url) + self.project_name = project + # capture a friendly run label (workflow alias) for evaluations + self.run_name = _workflow_alias + # Build id->input mapping for later span matching (best effort) + if _eval_input and getattr(_eval_input, "eval_input_items", None): + for it in _eval_input.eval_input_items: + item_id = str(it.id) + input_val = str(it.input_obj) if it.input_obj is not None else "" + if item_id: + self._id_to_input[item_id] = input_val + logger.debug("Initialized Phoenix client for project '%s' at '%s'", project, server_url) + return True + except (ValueError, RuntimeError, TypeError) as e: + logger.warning("Failed to initialize Phoenix client: %s", e) + self.client = None + self.project_name = None + return False + + def log_prediction(self, _item: EvalInputItem, _output: Any): + """No-op for Phoenix (kept for interface parity).""" + return + + async def log_usage_stats(self, item: EvalInputItem, usage_stats_item): # noqa: ANN001 + """Best-effort usage stats logging as span annotations. + + We intentionally keep this lightweight and skip logging if span resolution fails. + """ + if not self.client: + return + span_id = self._resolve_span_id_for_item(str(item.id)) + if not span_id: + return + try: + self.client.annotations.add_span_annotation( + span_id=span_id, + annotation_name=self._metric_eval_name("wf_runtime"), + annotator_kind="LLM", + label="seconds", + score=float(getattr(usage_stats_item, "runtime", 0.0) or 0.0), + explanation=None, + ) + self.client.annotations.add_span_annotation( + span_id=span_id, + annotation_name=self._metric_eval_name("wf_tokens"), + annotator_kind="LLM", + label="count", + score=float(getattr(usage_stats_item, "total_tokens", 0) or 0), + explanation=None, + ) + except (ValueError, TypeError, RuntimeError, httpx.HTTPError): + logger.debug("Phoenix usage stats logging failed") + + async def alog_score(self, eval_output: EvalOutput, evaluator_name: str): + """Log per-item evaluator scores to Phoenix as span annotations.""" + if not self.client: + return + + if not eval_output.eval_output_items: + return + + for eval_output_item in eval_output.eval_output_items: + span_id = self._resolve_span_id_for_item(str(eval_output_item.id)) + if not span_id: + continue + score_val = eval_output_item.score + try: + score_val = float(score_val) + except (TypeError, ValueError): + # Skip non-numeric scores + continue + try: + self.client.annotations.add_span_annotation( + span_id=span_id, + annotation_name=self._metric_eval_name(evaluator_name), + annotator_kind="LLM", + label="score", + score=score_val, + explanation=None, + ) + except (ValueError, TypeError, RuntimeError, httpx.HTTPError): + logger.debug("Phoenix per-item score logging failed") + + async def afinish_loggers(self): + # No-op for Phoenix integration + return + + def log_summary(self, + _usage_stats: UsageStats, + _evaluation_results: list[tuple[str, EvalOutput]], + _profiler_results: ProfilerResults): + """No-op: Phoenix Client annotations are span-based; skip summary logging.""" + return + + def _resolve_span_id_for_item(self, item_id: str) -> str | None: + """Resolve a Phoenix span id for an evaluation item. + + Keep this best-effort and lightweight: fetch a small recent window of spans + and match on `input.value`. If unavailable or not found, skip. + """ + if not self.client or not self.project_name or not item_id: + return None + input_value = self._id_to_input.get(item_id) + if input_value is None: + return None + try: + # Search a narrow window to reduce overhead + end_time = datetime.now() + start_time = end_time - timedelta(hours=4) + spans = self.client.spans.get_spans( + project_identifier=self.project_name, + limit=2000, + start_time=start_time, + end_time=end_time, + ) + for span in spans or []: + sid = span.get("id") + attrs = span.get("attributes") or {} + val = attrs.get("input.value") + if sid and val is not None and str(val) == str(input_value): + return str(sid) + except (ValueError, TypeError, RuntimeError, httpx.HTTPError): + return None + return None