Source code for nemo_retriever.graph.lancedb_write_operator
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Graph operator that appends embedding rows to a LanceDB table.
Designed for the service-mode ingest pipeline where pages arrive in
incremental batches. Unlike :class:`LanceDBWriterActor` (which
overwrites on init), this operator lazily creates-or-opens the target
table and appends rows so that concurrent worker processes can all
write to the same LanceDB directory (LanceDB uses file-level locking
internally).
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
from nemo_retriever.graph.abstract_operator import AbstractOperator
from nemo_retriever.graph.cpu_operator import CPUOperator
logger = logging.getLogger(__name__)
[docs]
class LanceDBWriteOperator(AbstractOperator, CPUOperator):
"""Append embedding rows from a DataFrame batch to LanceDB.
The table is created on first write and appended to thereafter.
The operator is side-effect-only: it returns the input DataFrame
unmodified.
"""
def __init__(
self,
*,
uri: str = "/var/lib/nemo-retriever/lancedb",
table_name: str = "nv-ingest",
hybrid: bool = False,
embedding_column: str = "text_embeddings_1b_v2",
embedding_key: str = "embedding",
text_column: str = "text",
include_text: bool = True,
) -> None:
super().__init__()
self._uri = uri
self._table_name = table_name
self._hybrid = hybrid
self._embedding_column = embedding_column
self._embedding_key = embedding_key
self._text_column = text_column
self._include_text = include_text
self._db: Any = None
self._table: Any = None
def _ensure_connected(self, vector_dim: int) -> None:
"""Lazily connect to LanceDB and create-or-open the table.
If the existing table's schema doesn't match the expected schema,
drop and recreate it to avoid field-mismatch errors from stale data.
"""
if self._db is not None:
return
import lancedb as _ldb
from nemo_retriever.vdb.lancedb_schema import lancedb_schema
ldb_path = Path(self._uri)
ldb_path.mkdir(parents=True, exist_ok=True)
self._db = _ldb.connect(uri=self._uri)
expected_schema = lancedb_schema(vector_dim)
expected_fields = {f.name for f in expected_schema}
try:
table = self._db.open_table(self._table_name)
existing_fields = {f.name for f in table.schema}
if not expected_fields.issubset(existing_fields):
missing = expected_fields - existing_fields
logger.warning(
"LanceDB table %r schema mismatch (missing: %s) — recreating",
self._table_name,
missing,
)
self._db.drop_table(self._table_name)
raise KeyError("schema mismatch")
self._table = table
logger.debug("Opened existing LanceDB table %r at %s", self._table_name, self._uri)
except Exception:
self._table = self._db.create_table(
self._table_name,
schema=expected_schema,
mode="create",
)
logger.info("Created LanceDB table %r at %s (dim=%d)", self._table_name, self._uri, vector_dim)
[docs]
def preprocess(self, data: Any, **kwargs: Any) -> Any:
return data
[docs]
def process(self, data: Any, **kwargs: Any) -> Any:
from nemo_retriever.vdb.lancedb_schema import build_lancedb_rows, infer_vector_dim
rows = build_lancedb_rows(
data,
embedding_column=self._embedding_column,
embedding_key=self._embedding_key,
text_column=self._text_column,
include_text=self._include_text,
)
if not rows:
logger.debug("No embedding rows in batch — skipping LanceDB write")
return data
vector_dim = infer_vector_dim(rows)
if vector_dim <= 0:
logger.warning("Could not infer embedding dimension — skipping LanceDB write")
return data
self._ensure_connected(vector_dim)
self._table.add(rows)
logger.debug("Wrote %d rows to LanceDB table %r", len(rows), self._table_name)
return data
[docs]
def postprocess(self, data: Any, **kwargs: Any) -> Any:
return data