Xenna Pipeline Observability#
Real-time observability for cosmos-xenna data preparation pipelines, with W&B metrics logging and pipeline statistics tracking.
Implementation Note: This module uses a monkey-patching approach to intercept pipeline statistics, since cosmos-xenna does not currently expose a native stats callback API.
Overview#
When running data preparation pipelines (pretrain, SFT), you can log pipeline statistics to Weights & Biases in real time. This gives you visibility into:
Pipeline progress β inputs processed, outputs generated, completion percentage
Cluster utilization β CPU/GPU/memory usage across the Ray cluster
Per-stage metrics β actor counts, queue depths, processing speeds for each pipeline stage
Bottleneck detection β which stages are blocking throughput
Configuration#
Enable W&B logging via the observability section in your data prep config:
# In your data_prep config (e.g., default.yaml)
observability:
# Enable real-time W&B logging of pipeline stats
wandb_log_pipeline_stats: true
# How often to log (seconds) - matches cosmos-xenna's internal logging rate
pipeline_logging_interval_s: 30
# Optional: Also write stats to JSONL file for offline analysis
pipeline_stats_jsonl_path: /path/to/stats.jsonl
How It Works#
The Monkey-Patch Approach#
cosmos-xennaβs PipelineMonitor class builds a PipelineStats object every logging_interval_s via the internal _make_stats() method. Our hook intercepts this method:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β cosmos-xenna pipeline β
β β
β PipelineMonitor.update() β
β β β
β βΌ β
β _make_stats() βββββ Monkey-patched by WandbStatsHook β
β β β
β ββββΊ Original _make_stats() returns PipelineStats β
β β β
β ββββΊ Hook intercepts stats βββΊ wandb.log() + JSONL β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Benefits of this approach:
No cosmos-xenna changes required β works with current cosmos-xenna main
Same update frequency β matches cosmos-xennaβs internal logging cadence
Structured data β gets full
PipelineStatsobject, not just text outputZero pipeline impact β original return value is preserved unchanged
Thread Safety#
The hook uses reference counting for safe nested contexts:
Multiple hooks can be active simultaneously
Patch is installed when first hook enters, restored when last hook exits
Thread-safe with a reentrant lock
Metrics Logged#
Pipeline-Level Metrics#
Metric |
Description |
|---|---|
|
Total elapsed time since pipeline start |
|
Pipeline main loop frequency |
|
Percentage of inputs processed (0-100) |
|
Inputs still waiting to be processed |
|
Total outputs generated |
|
Input processing rate |
|
Output generation rate |
Cluster Resource Metrics#
Metric |
Description |
|---|---|
|
Total CPUs in Ray cluster |
|
Available (unused) CPUs |
|
Total GPUs in cluster |
|
Available GPUs |
|
Total cluster memory (GB) |
|
Available memory (GB) |
Per-Stage Metrics (Consolidated Charts)#
Stage metrics are logged as consolidated line_series charts (one chart per metric, one line per stage):
Metric |
Description |
|---|---|
|
Target number of actors per stage |
|
Actors ready to process per stage |
|
Actors currently processing per stage |
|
Total completed tasks per stage |
|
Input queue depth per stage |
|
Output queue depth per stage |
|
Processing speed per stage |
|
CPU utilization per stage |
|
Memory usage (GB) per stage |
Usage in Recipes#
The pretrain and SFT recipes automatically use the W&B hook when wandb_log_pipeline_stats: true:
from nemotron.data_prep.observability import make_wandb_stats_hook
# Create hook if enabled
wandb_hook = make_wandb_stats_hook(
observability=observability_cfg,
pipeline_kind="pretrain", # or "sft"
run_hash=context.run_hash,
run_dir=context.run_dir,
dataset_names=context.dataset_names,
)
# Run pipeline with hook
if wandb_hook:
with wandb_hook:
pipelines_v1.run_pipeline(pipeline_spec)
else:
pipelines_v1.run_pipeline(pipeline_spec)
JSONL Output#
For offline analysis or when W&B isnβt available, enable JSONL output:
observability:
wandb_log_pipeline_stats: false
pipeline_stats_jsonl_path: /output/pipeline_stats.jsonl
Each line contains a JSON record:
{
"timestamp": 1706123456.789,
"pipeline_kind": "pretrain",
"run_hash": "abc123",
"metrics": {
"pipeline_duration_s": 120.5,
"progress": 50.0,
"cluster/total_cpus": 64.0,
"stages/tasks_completed/download": 100
},
"stages": ["PlanStage", "DownloadStage", "BinIdxTokenizationStage"]
}
API Reference#
wandb_hook.py#
Export |
Description |
|---|---|
|
Context manager that patches |
|
Factory function for recipes |
|
Log plan table showing datasets and processing config |
ObservabilityConfig#
Field |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Enable W&B logging |
|
|
|
Log plan table to W&B |
|
|
|
Log per-dataset progress table |
|
|
|
Log stage overview table |
|
|
|
Logging interval in seconds |
|
|
|
Path for JSONL output |
Troubleshooting#
Metrics not appearing in W&B#
Verify W&B is initialized before the pipeline runs:
import wandb assert wandb.run is not None, "W&B not initialized"
Check that
wandb_log_pipeline_stats: truein your configEnsure the hook is active during pipeline execution (check for log message: βInstalled PipelineMonitor._make_stats patchβ)
Import errors for cosmos_xenna#
The hook lazy-imports cosmos_xenna only when entering the context. If you see import errors:
Ensure cosmos-xenna is installed:
uv pip install cosmos-xennaFor Ray workers, use
--extra xennain the run command (handled automatically by recipes)
Missing stage metrics#
Some stages may not report all metrics if:
The stage hasnβt processed any tasks yet
The stage has
processing_speed_tasks_per_second = None(no speed data available)
These are expected behaviors and the hook gracefully handles missing data.
Further Reading#
Weights & Biases Integration β W&B configuration and authentication
Data Preparation β data prep module overview
Artifact Lineage β tracking data lineage in W&B