Adding a Data Source#
This guide walks through creating a new data source plugin. Data sources are specialized tools that provide domain-specific search capabilities – academic papers, patent databases, financial data, internal knowledge bases, and similar services.
The pattern follows the existing Google Scholar paper search at sources/google_scholar_paper_search/.
Data Source vs. Tool#
A data source is architecturally identical to a tool. The distinction is conceptual:
Aspect |
Tool |
Data Source |
|---|---|---|
Purpose |
General utility (web search, code execution) |
Domain-specific information retrieval |
Location |
|
|
Registration |
|
|
Agent interaction |
Agent calls it for actions |
Agent calls it for knowledge |
Both use the same NeMo Agent Toolkit plugin pattern. This guide focuses on data-source-specific patterns like structured result formatting, pagination, and search parameter handling.
Prerequisites#
The repository virtual environment is active (
.venv)You understand NeMo Agent Toolkit’s
@register_functiondecoratorYou have access to the data source’s API or SDK
Step 1: Create the Package#
sources/my_data_source/
pyproject.toml
README.md
src/
__init__.py
register.py # Config + NAT registration
search.py # Search implementation
tests/
test_search.py
mkdir -p sources/my_data_source/src sources/my_data_source/tests
touch sources/my_data_source/src/__init__.py
Step 2: Implement the Search Client#
Separate the API client logic from the NeMo Agent Toolkit registration. This makes it testable independently.
# sources/my_data_source/src/search.py
import logging
from typing import Any
import httpx
logger = logging.getLogger(__name__)
class PatentSearchClient:
"""Client for searching a patent database API."""
def __init__(
self,
api_key: str,
timeout: int = 30,
max_results: int = 10,
):
self.api_key = api_key
self.timeout = timeout
self.max_results = max_results
async def search(
self,
query: str,
year: str | None = None,
jurisdiction: str | None = None,
) -> str:
"""Search for patents matching the query.
Args:
query: The search query describing the invention or technology.
year: Optional year filter (e.g., "2024").
jurisdiction: Optional jurisdiction filter (e.g., "US", "EP").
Returns:
Formatted patent search results as a string.
"""
params: dict[str, Any] = {
"q": query,
"limit": self.max_results,
}
if year:
params["year"] = year
if jurisdiction:
params["jurisdiction"] = jurisdiction
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
"https://api.patents.example.com/v2/search",
params=params,
headers={"X-API-Key": self.api_key},
)
response.raise_for_status()
data = response.json()
patents = data.get("patents", [])
if not patents:
return f"No patents found for query: {query}"
results = []
for patent in patents:
patent_id = patent.get("id", "")
title = patent.get("title", "")
abstract = patent.get("abstract", "")
filing_date = patent.get("filing_date", "")
url = patent.get("url", "")
results.append(
f"**{title}** ({patent_id})\n"
f"Filed: {filing_date}\n"
f"Abstract: {abstract}\n"
f"Link: {url}"
)
return "\n\n---\n\n".join(results)
Step 3: Define Config and Register#
# sources/my_data_source/src/register.py
import logging
import os
from pydantic import Field, SecretStr
from nat.builder.builder import Builder
from nat.builder.function_info import FunctionInfo
from nat.cli.register_workflow import register_function
from nat.data_models.function import FunctionBaseConfig
from .search import PatentSearchClient
logger = logging.getLogger(__name__)
_missing_key_warned = False
class PatentSearchConfig(FunctionBaseConfig, name="patent_search"):
"""
Configuration for the patent search data source.
Searches a patent database API for relevant patents.
Requires a PATENT_API_KEY environment variable or config.
"""
timeout: int = Field(
default=30, description="Timeout in seconds for search requests"
)
max_results: int = Field(
default=10, description="Maximum number of results to return"
)
api_key: SecretStr | None = Field(
default=None, description="API key for the patent search service"
)
@register_function(config_type=PatentSearchConfig)
async def patent_search(tool_config: PatentSearchConfig, builder: Builder):
"""Register patent search data source."""
# Resolve API key
if not os.environ.get("PATENT_API_KEY") and tool_config.api_key:
os.environ["PATENT_API_KEY"] = tool_config.api_key.get_secret_value()
api_key = os.environ.get("PATENT_API_KEY")
if not api_key:
global _missing_key_warned
if not _missing_key_warned:
logger.warning(
"PATENT_API_KEY not found. The patent search tool will be "
"registered but will return an error when called."
)
_missing_key_warned = True
async def _stub(query: str, year: str | None = None) -> str:
"""Patent search (unavailable - missing PATENT_API_KEY)."""
return (
"Error: Patent search is unavailable because PATENT_API_KEY is not set.\n"
"Set the API key in your environment or .env file and restart."
)
yield FunctionInfo.from_fn(_stub, description=_stub.__doc__)
return
client = PatentSearchClient(
api_key=api_key,
timeout=tool_config.timeout,
max_results=tool_config.max_results,
)
async def _patent_search(
query: str,
year: str | None = None,
jurisdiction: str | None = None,
) -> str:
"""Searches for patents and intellectual property filings.
This tool returns patents from a patent database with titles,
abstracts, filing dates, and links. Use for queries about
inventions, prior art, technology patents, and IP research.
"""
return await client.search(query, year, jurisdiction)
yield FunctionInfo.from_fn(
_patent_search,
description=_patent_search.__doc__,
)
Step 4: Create pyproject.toml#
# sources/my_data_source/pyproject.toml
[build-system]
build-backend = "setuptools.build_meta"
requires = ["setuptools >= 64", "setuptools-scm>=8"]
[tool.setuptools]
packages = ["my_data_source"]
package-dir = {"my_data_source" = "src"}
[project]
name = "my-data-source"
version = "1.0.0"
description = "NAT-based patent search data source"
requires-python = ">=3.11,<3.14"
dependencies = [
"nvidia-nat==1.5.0",
"httpx>=0.24.0",
"pydantic>=2.0.0",
]
[project.entry-points."nat.plugins"]
patent_search = "my_data_source.register"
Install:
uv pip install -e ./sources/my_data_source
Step 5: Register as a Data Source in YAML#
To make your data source appear in the UI as a toggleable source with per-message filtering, add it to the data_source_registry in your config. Agents automatically inherit all registry tools, so this is the only config change needed:
functions:
# Register data sources for UI toggles and filtering
data_sources:
_type: data_source_registry
sources:
- id: web_search
name: "Web Search"
description: "Search the web for real-time information."
tools:
- web_search_tool
- id: patent_search
name: "Patent Search"
description: "Search patent databases for intellectual property filings."
tools:
- patent_tool
# Define the tool instances
patent_tool:
_type: patent_search
max_results: 5
timeout: 15
web_search_tool:
_type: tavily_web_search
max_results: 3
# Agents inherit all registry tools automatically --
# no need to list tools per-agent unless you want to exclude specific ones
shallow_research_agent:
_type: shallow_research_agent
llm: research_llm
The data_source_registry provides:
GET /v1/data_sourcesAPI endpoint returns the registered sources (the UI renders these as toggles)Per-message filtering via
data_sources: ["web_search"]in the WebSocket chat payload, or as adata_sourcesfield onPOST /v1/jobs/async/submit– only tools belonging to selected sources are activeDisplay metadata (name, description) shown in the UI
Auth gating – set
requires_auth: trueon a source to grey it out in the UI until the user signs in (e.g., enterprise sources that need user-level OAuth tokens). Sources using backend API keys (Tavily, Serper) should leave thisfalse(the default).Auto-inheritance – all agents get every registered tool by default (use
exclude_toolson an agent for per-agent specialization)
If a tool isn’t listed in any data_source_registry source entry, it is always included regardless of filtering (e.g., utility tools like “think” or “calculator”). Passing an explicit empty list (data_sources: []) – in the WebSocket chat payload or in a POST /v1/jobs/async/submit body – disables data-source tools while leaving those unmapped utility tools available.
Source Entry Field Reference#
Field |
Type |
Default |
Description |
|---|---|---|---|
|
string |
required |
Unique key used in API payloads and filtering |
|
string |
required |
Display name shown in the UI |
|
string |
|
Human-readable description for the UI |
|
list |
|
NAT function or function group names belonging to this source |
|
bool |
|
Grey out in UI until user signs in (for user-token sources) |
|
bool |
|
Whether enabled by default in the UI |
Using MCP Tools as Data Sources#
NAT MCP client tools are function groups. They work with the data source registry the same way as any other function group – just reference the group name in tools:
function_groups:
mcp_docs_search:
_type: mcp_client
server:
transport: streamable-http
url: "http://localhost:9901/mcp"
functions:
data_sources:
_type: data_source_registry
sources:
- id: web_search
name: "Web Search"
description: "Search the web for real-time information."
tools:
- web_search_tool
- id: docs_search
name: "Internal Docs"
description: "Search internal documentation via MCP."
tools:
- mcp_docs_search
web_search_tool:
_type: tavily_web_search
deep_research_agent:
_type: deep_research_agent
tools:
- web_search_tool
- mcp_docs_search
The registry auto-detects that mcp_docs_search is a function group and uses NAT’s group separator (__) for prefix matching. All tools exposed by the MCP server (e.g., mcp_docs_search__find_pages, mcp_docs_search__get_page) will map to the docs_search data source.
Data Source Design Patterns#
Multi-Parameter Search Functions#
Data sources often accept optional filters beyond the query string. Expose these as optional parameters with clear type annotations:
async def _search(
query: str,
year: str | None = None,
jurisdiction: str | None = None,
category: str | None = None,
) -> str:
"""Search patents with optional filters.
Args:
query: Natural language description of the technology.
year: Filter by filing year (e.g., "2024").
jurisdiction: Filter by jurisdiction ("US", "EP", "CN").
category: Filter by patent category.
"""
The LLM will use the parameter descriptions to decide which filters to apply.
Structured Result Formatting#
Format results so the agent can extract citations. Use a consistent pattern:
# XML Document format (matches Tavily pattern)
f'<Document href="{url}">\n<title>\n{title}\n</title>\n{content}\n</Document>'
# Or structured text format
f"**{title}** ({id})\nAbstract: {abstract}\nLink: {url}"
Pagination#
If the source API supports pagination, handle it internally rather than exposing it to the agent:
async def search(self, query: str) -> str:
all_results = []
offset = 0
while len(all_results) < self.max_results:
batch = await self._fetch_page(query, offset, batch_size=20)
if not batch:
break
all_results.extend(batch)
offset += len(batch)
return self._format_results(all_results[:self.max_results])
Rate Limiting and Retries#
import asyncio
async def search(self, query: str) -> str:
for attempt in range(self.max_retries):
try:
return await self._do_search(query)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
wait = 2 ** attempt
logger.warning(f"Rate limited, retrying in {wait}s")
await asyncio.sleep(wait)
elif attempt == self.max_retries - 1:
return f"Error: Search failed after {self.max_retries} attempts - {e}"
else:
await asyncio.sleep(1)
Existing Data Source Reference#
Data Source |
|
Package |
Description |
|---|---|---|---|
Tavily Web Search |
|
|
General web search through Tavily API |
Exa Web Search |
|
|
General web search through Exa API |
Google Scholar |
|
|
Academic papers through Serper/Google Scholar |
Knowledge Layer |
|
|
Document retrieval through pluggable backends |
Checklist#
Package created under
sources/<name>/withpyproject.tomlSearch client implemented separately from registration
Config class with
FunctionBaseConfigand uniquename@register_functionwith stub fallback for missing API keyClear docstring explaining what the data source searches
Optional search parameters with descriptions for the LLM
Proper error handling and retries
Entry point in
pyproject.tomlAdded to
data_source_registryin YAML config for UI integrationInstalled and tested with
nat run