# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Graph operator for posting processed results to a webhook endpoint."""
from __future__ import annotations
import logging
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
import requests
import pandas as pd
from nemo_retriever.graph.abstract_operator import AbstractOperator
from nemo_retriever.graph.cpu_operator import CPUOperator
from nemo_retriever.graph.designer import designer_component
logger = logging.getLogger(__name__)
def _serialize_value(value: Any) -> Any:
"""Best-effort conversion of a cell value to a JSON-safe type."""
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, bytes):
return f"<bytes len={len(value)}>"
if isinstance(value, (list, tuple)):
return [_serialize_value(v) for v in value]
if isinstance(value, dict):
return {str(k): _serialize_value(v) for k, v in value.items()}
return str(value)
def _dataframe_to_records(df: pd.DataFrame, columns: list[str] | None) -> list[dict[str, Any]]:
"""Convert a DataFrame (or a column subset) to a list of JSON-safe dicts."""
if columns:
missing = [c for c in columns if c not in df.columns]
if missing:
logger.warning("WebhookNotifyOperator: requested columns missing from batch: %s", missing)
available = [c for c in columns if c in df.columns]
df = df[available] if available else df
records: list[dict[str, Any]] = []
for row in df.to_dict(orient="records"):
records.append({k: _serialize_value(v) for k, v in row.items()})
return records
[docs]
@designer_component(
name="Webhook Notify",
category="I/O & Integration",
compute="cpu",
description="HTTP POST processed results to a configurable webhook endpoint",
category_color="#f5a623",
)
class WebhookNotifyOperator(AbstractOperator, CPUOperator):
"""Post batch results to an external HTTP endpoint.
This is a **side-effect-only** operator: it sends a JSON payload to a
remote URL but passes the incoming data through unmodified. If
``endpoint_url`` is ``None`` (the default) the operator is a no-op.
Parameters
----------
params
A :class:`~nemo_retriever.params.WebhookParams` instance. If
``None`` or ``params.endpoint_url`` is falsy the stage does nothing.
"""
def __init__(self, *, params: Any = None) -> None:
super().__init__()
self._params = params
self._session: "requests.Session | None" = None
def _get_session(self) -> "requests.Session":
if self._session is None:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
max_retries = getattr(self._params, "max_retries", 3)
headers = dict(getattr(self._params, "headers", None) or {})
headers.setdefault("Content-Type", "application/json")
session = requests.Session()
session.headers.update(headers)
retry_strategy = Retry(
total=max_retries,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST"],
)
session.mount("http://", HTTPAdapter(max_retries=retry_strategy))
session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
self._session = session
return self._session
@property
def _endpoint_url(self) -> str | None:
return getattr(self._params, "endpoint_url", None) if self._params else None
[docs]
def preprocess(self, data: Any, **kwargs: Any) -> Any:
return data
[docs]
def process(self, data: Any, **kwargs: Any) -> Any:
url = self._endpoint_url
if not url:
return data
columns = getattr(self._params, "columns", None) or []
timeout = getattr(self._params, "timeout_s", 30.0)
records = _dataframe_to_records(data, columns or None)
if not records:
logger.debug("WebhookNotifyOperator: empty batch, skipping POST to %s", url)
return data
session = self._get_session()
try:
response = session.post(url, json=records, timeout=timeout)
response.raise_for_status()
logger.info(
"WebhookNotifyOperator: POST %d records to %s — %s",
len(records),
url,
response.status_code,
)
except Exception:
logger.exception("WebhookNotifyOperator: failed to POST to %s", url)
return data
[docs]
def postprocess(self, data: Any, **kwargs: Any) -> Any:
return data