For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
  • Getting Started
    • Welcome
    • Contributing
  • Concepts
    • Columns
    • Seed Datasets
    • Agent Rollout Ingestion
    • Custom Columns
    • Validators
    • Processors
    • Person Sampling
    • Traces
    • Architecture & Performance
    • Deployment Options
    • Security
  • Tutorials
    • Overview
    • The Basics
    • Structured Outputs, Jinja Expressions, and Conditional Generation
    • Seeding with an External Dataset
    • Providing Images as Context
    • Generating Images
    • Image-to-Image Editing
  • Recipes
    • Recipe Cards
      • Basic MCP Tool Use
      • PDF Document QA
      • Nemotron Super Search Agent
  • Plugins
    • Overview
    • Example Plugin
    • FileSystemSeedReader Plugins
    • Discover
  • Code Reference
    • Overview
  • Dev Notes
    • Overview
    • Push Datasets to Hugging Face Hub
    • Text-to-SQL for Nemotron Super
    • Async All the Way Down
    • Owning the Model Stack
    • Data Designer Got Skills
NVIDIANVIDIA
Developer-friendly docs for your API
Privacy Policy | Manage My Privacy | Do Not Sell or Share My Data | Terms of Service | Accessibility | Corporate Policies | Product Security | Contact

Copyright © 2026, NVIDIA Corporation.

LogoLogoNeMo Data Designer
RecipesMCP and Tool Use

PDF Document QA

||View as Markdown|
Previous

Basic MCP Tool Use

Next

Nemotron Super Search Agent

Download Recipe

Download the complete recipe script

1# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3# /// script
4# requires-python = ">=3.10"
5# dependencies = [
6# "data-designer",
7# "mcp",
8# "bm25s",
9# "pymupdf",
10# "rich",
11# ]
12# ///
13"""MCP + Tool Use Recipe: Document Q&A with BM25S Lexical Search
14
15This recipe demonstrates an end-to-end MCP tool-calling workflow:
16
171) Load one or more PDF documents from URLs or local paths.
182) Index them with BM25S for fast lexical search.
193) Use Data Designer tool calls (`search_docs`) to generate grounded Q&A pairs.
20
21Prerequisites:
22 - OPENAI_API_KEY environment variable for OpenAI provider model aliases.
23 - NVIDIA_API_KEY environment variable for NVIDIA provider model aliases (default model alias is "nvidia-reasoning").
24
25Run:
26 # Basic usage with default sample PDF (generates 4 Q&A pairs)
27 uv run pdf_qa.py
28
29 # For help message and available options
30 uv run pdf_qa.py --help
31"""
32
33from __future__ import annotations
34
35import argparse
36import io
37import json
38import os
39import sys
40from pathlib import Path
41from urllib.parse import urlparse
42from urllib.request import urlopen
43
44import bm25s
45import fitz
46from mcp.server.fastmcp import FastMCP
47from pydantic import BaseModel, Field
48
49import data_designer.config as dd
50from data_designer.config.preview_results import PreviewResults
51from data_designer.interface import DataDesigner
52
53DEFAULT_PDF_URL = "https://research.nvidia.com/labs/nemotron/files/NVIDIA-Nemotron-3-Nano-Technical-Report.pdf"
54MCP_SERVER_NAME = "doc-bm25-search"
55
56# Global state for the BM25 index (populated at server startup)
57_bm25_retriever: bm25s.BM25 | None = None
58_corpus: list[dict[str, str]] = []
59
60
61class QAPair(BaseModel):
62 question: str = Field(..., description="A question grounded in the document text.")
63 answer: str = Field(..., description="A concise answer grounded in the supporting passage.")
64 supporting_passage: str = Field(
65 ..., description="A short excerpt (2-4 sentences) copied from the search result that supports the answer."
66 )
67 citation: str = Field(
68 ..., description="The citation (e.g. source url, page number, etc) of the supporting passage."
69 )
70
71
72class TopicList(BaseModel):
73 topics: list[str] = Field(
74 ...,
75 description="High-level topics covered by the document.",
76 )
77
78
79def _is_url(path_or_url: str) -> bool:
80 """Check if the given string is a URL."""
81 parsed = urlparse(path_or_url)
82 return parsed.scheme in ("http", "https")
83
84
85def _get_source_name(path_or_url: str) -> str:
86 """Extract a human-readable source name from a path or URL."""
87 if _is_url(path_or_url):
88 parsed = urlparse(path_or_url)
89 return Path(parsed.path).name or parsed.netloc
90 return Path(path_or_url).name
91
92
93def extract_pdf_text(path_or_url: str) -> list[dict[str, str]]:
94 """Extract text from a PDF file or URL, returning a list of passages with metadata.
95
96 Each passage corresponds to a page from the PDF.
97
98 Args:
99 path_or_url: Either a local file path or a URL to a PDF document.
100 URLs are streamed directly into memory without saving to disk.
101
102 Returns:
103 List of passage dictionaries with 'text', 'page', and 'source' keys.
104 """
105 passages: list[dict[str, str]] = []
106 source_name = _get_source_name(path_or_url)
107
108 if _is_url(path_or_url):
109 with urlopen(path_or_url) as response:
110 pdf_bytes = response.read()
111 doc = fitz.open(stream=io.BytesIO(pdf_bytes), filetype="pdf")
112 else:
113 doc = fitz.open(path_or_url)
114
115 for page_num in range(len(doc)):
116 page = doc[page_num]
117 text = page.get_text("text").strip()
118 if text:
119 passages.append(
120 {
121 "text": text,
122 "page": str(page_num + 1),
123 "source": source_name,
124 }
125 )
126
127 doc.close()
128 return passages
129
130
131def build_bm25_index(passages: list[dict[str, str]]) -> bm25s.BM25:
132 """Build a BM25S index from the extracted passages."""
133 corpus_texts = [p["text"] for p in passages]
134 corpus_tokens = bm25s.tokenize(corpus_texts, stopwords="en")
135
136 retriever = bm25s.BM25()
137 retriever.index(corpus_tokens)
138
139 return retriever
140
141
142def initialize_search_index(pdf_sources: list[str]) -> None:
143 """Load PDFs from paths/URLs and build the BM25 index.
144
145 Args:
146 pdf_sources: List of PDF file paths or URLs to index.
147 """
148 global _bm25_retriever, _corpus
149
150 _corpus = []
151 for source in pdf_sources:
152 passages = extract_pdf_text(source)
153 _corpus.extend(passages)
154
155 if _corpus:
156 _bm25_retriever = build_bm25_index(_corpus)
157
158
159# MCP Server Definition
160mcp_server = FastMCP(MCP_SERVER_NAME)
161
162
163@mcp_server.tool()
164def search_docs(query: str, limit: int = 5, document: str = "", page: str = "") -> str:
165 """Search through documents using BM25 lexical search.
166
167 BM25 is a keyword-based retrieval algorithm that matches exact terms. For best results:
168
169 - Use specific keywords, not full questions (e.g., "configuration parameters timeout" not "How do I set the timeout?")
170 - Include domain-specific terms that would appear in the source text
171 - Combine multiple relevant terms to narrow results (e.g., "installation requirements dependencies")
172 - Try synonyms or alternative phrasings if initial searches return poor results
173 - Avoid filler words and focus on content-bearing terms
174
175 Examples:
176 Good queries:
177 - "error handling retry mechanism"
178 - "authentication token expiration"
179 - "memory allocation buffer size"
180
181 Less effective queries:
182 - "What are the error handling options?"
183 - "Tell me about authentication"
184 - "How does memory work?"
185
186 Args:
187 query: Search query string - use specific keywords for best results
188 limit: Maximum number of results to return (default: 5)
189 document: Optional document source name to restrict search to (use list_docs to see available documents)
190 page: Optional page number to restrict search to (requires document to be specified)
191
192 Returns:
193 JSON string with search results including text excerpts and page numbers
194 """
195 global _bm25_retriever, _corpus
196
197 if _bm25_retriever is None or not _corpus:
198 return json.dumps({"error": "Search index not initialized"})
199
200 # Validate that page requires document
201 if page and not document:
202 return json.dumps({"error": "The 'page' parameter requires 'document' to be specified"})
203
204 query_tokens = bm25s.tokenize([query], stopwords="en")
205
206 # When filtering, retrieve more results to ensure we have enough after filtering
207 retrieve_limit = len(_corpus) if (document or page) else limit
208 results, scores = _bm25_retriever.retrieve(query_tokens, k=min(retrieve_limit, len(_corpus)))
209
210 search_results: list[dict[str, str | float]] = []
211 for i in range(results.shape[1]):
212 doc_idx = results[0, i]
213 score = float(scores[0, i])
214
215 if score <= 0:
216 continue
217
218 passage = _corpus[doc_idx]
219
220 # Apply document filter
221 if document and passage["source"] != document:
222 continue
223
224 # Apply page filter
225 if page and passage["page"] != page:
226 continue
227
228 search_results.append(
229 {
230 "text": passage["text"][:2000],
231 "page": passage["page"],
232 "source": passage["source"],
233 "score": round(score, 4),
234 "url": f"file://{passage['source']}#page={passage['page']}",
235 }
236 )
237
238 # Stop once we have enough results
239 if len(search_results) >= limit:
240 break
241
242 return json.dumps({"results": search_results, "query": query, "total": len(search_results)})
243
244
245@mcp_server.tool()
246def list_docs() -> str:
247 """List all documents in the search index with their page counts.
248
249 Returns:
250 JSON string with a list of documents, each containing the source name and page count.
251 """
252 global _corpus
253
254 if not _corpus:
255 return json.dumps({"error": "Search index not initialized", "documents": []})
256
257 doc_pages: dict[str, set[str]] = {}
258 for passage in _corpus:
259 source = passage["source"]
260 page = passage["page"]
261 if source not in doc_pages:
262 doc_pages[source] = set()
263 doc_pages[source].add(page)
264
265 documents = [{"source": source, "page_count": len(pages)} for source, pages in sorted(doc_pages.items())]
266
267 return json.dumps({"documents": documents, "total_documents": len(documents)})
268
269
270def build_config(model_alias: str, provider_name: str) -> dd.DataDesignerConfigBuilder:
271 """Build the Data Designer configuration for document Q&A generation."""
272 tool_config = dd.ToolConfig(
273 tool_alias="doc-search",
274 providers=[provider_name],
275 allow_tools=["list_docs", "search_docs"],
276 max_tool_call_turns=100,
277 timeout_sec=30.0,
278 )
279
280 config_builder = dd.DataDesignerConfigBuilder(tool_configs=[tool_config])
281 config_builder.add_column(
282 dd.SamplerColumnConfig(
283 name="seed_id",
284 sampler_type=dd.SamplerType.UUID,
285 params=dd.UUIDSamplerParams(),
286 drop=True,
287 )
288 )
289
290 config_builder.add_column(
291 dd.LLMStructuredColumnConfig(
292 name="topic_candidates",
293 model_alias=model_alias,
294 prompt="Extract a high-level list of all topics covered by documents our knowledge base.",
295 system_prompt=(
296 "You must call tools before answering. "
297 "Do not use outside knowledge; only use tool results. "
298 "You can use as many tool calls as required to answer the user query."
299 ),
300 output_format=TopicList,
301 tool_alias="doc-search",
302 with_trace=dd.TraceType.ALL_MESSAGES, # Enable trace to capture tool call history
303 )
304 )
305
306 config_builder.add_column(
307 dd.ExpressionColumnConfig(
308 name="topic",
309 expr="{{ topic_candidates.topics | random }}",
310 )
311 )
312
313 qa_prompt = """\
314Create a question-answer pair on the topic "{{topic}}", with supporting text and citation.
315The supporting_passage must be a 2-4 sentence excerpt copied from the tool result that demonstrates
316why the answer is correct.
317"""
318
319 config_builder.add_column(
320 dd.LLMStructuredColumnConfig(
321 name="qa_pair",
322 model_alias=model_alias,
323 prompt=qa_prompt,
324 system_prompt=(
325 "You must call tools before answering. "
326 "Do not use outside knowledge; only use tool results. "
327 "You can use as many tool calls as required to answer the user query."
328 ),
329 output_format=QAPair,
330 tool_alias="doc-search",
331 with_trace=dd.TraceType.ALL_MESSAGES, # Enable trace to capture tool call history
332 extract_reasoning_content=True,
333 )
334 )
335
336 config_builder.add_column(
337 dd.ExpressionColumnConfig(
338 name="question",
339 expr="{{ qa_pair.question }}",
340 )
341 )
342 config_builder.add_column(
343 dd.ExpressionColumnConfig(
344 name="answer",
345 expr="{{ qa_pair.answer }}",
346 )
347 )
348 config_builder.add_column(
349 dd.ExpressionColumnConfig(
350 name="supporting_passage",
351 expr="{{ qa_pair.supporting_passage }}",
352 )
353 )
354 config_builder.add_column(
355 dd.ExpressionColumnConfig(
356 name="citation",
357 expr="{{ qa_pair.citation }}",
358 )
359 )
360 return config_builder
361
362
363def generate_preview(
364 config_builder: dd.DataDesignerConfigBuilder,
365 num_records: int,
366 mcp_provider: dd.LocalStdioMCPProvider,
367) -> PreviewResults:
368 """Run Data Designer preview with the MCP provider."""
369 data_designer = DataDesigner(mcp_providers=[mcp_provider])
370 # Traces are enabled per-column via with_trace=True on LLM column configs
371 return data_designer.preview(config_builder, num_records=num_records)
372
373
374def _truncate(text: str, max_length: int = 100) -> str:
375 """Truncate text to max_length, adding ellipsis if needed."""
376 text = text.replace("\n", " ").strip()
377 if len(text) <= max_length:
378 return text
379 return text[: max_length - 3] + "..."
380
381
382def _summarize_content(content: object) -> str:
383 """Summarize ChatML-style content blocks for display."""
384 if isinstance(content, list):
385 parts: list[str] = []
386 for block in content:
387 if isinstance(block, dict):
388 block_type = block.get("type", "block")
389 if block_type == "text":
390 text = str(block.get("text", ""))
391 if text:
392 parts.append(text)
393 elif block_type == "image_url":
394 parts.append("[image]")
395 else:
396 parts.append(f"[{block_type}]")
397 else:
398 parts.append(str(block))
399 return " ".join(parts)
400 return str(content)
401
402
403def _format_trace_step(msg: dict[str, object]) -> str:
404 """Format a single trace message as a concise one-liner."""
405 role = msg.get("role", "unknown")
406 content = _summarize_content(msg.get("content", ""))
407 reasoning = msg.get("reasoning_content")
408 tool_calls = msg.get("tool_calls")
409 tool_call_id = msg.get("tool_call_id")
410
411 if role == "system":
412 return f"[bold cyan]system[/]({_truncate(str(content))})"
413
414 if role == "user":
415 return f"[bold green]user[/]({_truncate(str(content))})"
416
417 if role == "assistant":
418 parts: list[str] = []
419 if reasoning:
420 parts.append(f"[bold magenta]reasoning[/]({_truncate(str(reasoning))})")
421 if tool_calls and isinstance(tool_calls, list):
422 for tc in tool_calls:
423 if isinstance(tc, dict):
424 func = tc.get("function", {})
425 if isinstance(func, dict):
426 name = func.get("name", "?")
427 args = func.get("arguments", "")
428 parts.append(f"[bold yellow]tool_call[/]({name}: {_truncate(str(args), 60)})")
429 if content:
430 parts.append(f"[bold blue]content[/]({_truncate(str(content))})")
431 return "\n".join(parts) if parts else "[bold blue]assistant[/](empty)"
432
433 if role == "tool":
434 tool_id = str(tool_call_id or "?")[:8]
435 return f"[bold red]tool_response[/]([{tool_id}] {_truncate(str(content), 80)})"
436
437 return f"[dim]{role}[/]({_truncate(str(content))})"
438
439
440def _display_column_trace(column_name: str, trace: list[dict[str, object]]) -> None:
441 """Display a trace for a single column using Rich Panel."""
442 from rich.console import Console
443 from rich.panel import Panel
444
445 console = Console()
446 lines: list[str] = []
447
448 for msg in trace:
449 if not isinstance(msg, dict):
450 continue
451 formatted = _format_trace_step(msg)
452 for line in formatted.split("\n"):
453 lines.append(f" * {line}")
454
455 trace_content = "\n".join(lines) if lines else " (no trace messages)"
456 panel = Panel(
457 trace_content,
458 title=f"[bold]Column Trace: {column_name}[/]",
459 border_style="blue",
460 padding=(0, 1),
461 )
462 console.print(panel)
463
464
465def display_preview_record(preview_results: PreviewResults) -> None:
466 """Display a sample record from the preview results with trace visualization."""
467 from rich.console import Console
468
469 console = Console()
470 dataset = preview_results.dataset
471
472 if dataset is None or dataset.empty:
473 console.print("[red]No preview records generated.[/]")
474 return
475
476 record = dataset.iloc[0].to_dict()
477
478 # Find trace columns and their base column names
479 trace_columns = [col for col in dataset.columns if col.endswith("__trace")]
480
481 # Display non-trace columns as summary
482 non_trace_record = {k: v for k, v in record.items() if not k.endswith("__trace")}
483 console.print("\n[bold]Sample Record (data columns):[/]")
484 console.print(json.dumps(non_trace_record, indent=2, default=str))
485
486 # Display each trace column in its own panel
487 if trace_columns:
488 console.print("\n[bold]Generation Traces:[/]")
489 for trace_col in trace_columns:
490 base_name = trace_col.replace("__trace", "")
491 trace_data = record.get(trace_col)
492 if isinstance(trace_data, list):
493 _display_column_trace(base_name, trace_data)
494
495 preview_results.display_sample_record()
496
497
498def serve() -> None:
499 """Run the MCP server (called when launched as subprocess by Data Designer)."""
500 pdf_sources_json = os.environ.get("PDF_SOURCES", "[]")
501 pdf_sources = json.loads(pdf_sources_json)
502 if not pdf_sources:
503 pdf_sources = [DEFAULT_PDF_URL]
504 initialize_search_index(pdf_sources)
505 mcp_server.run()
506
507
508def parse_args() -> argparse.Namespace:
509 """Parse command line arguments."""
510 parser = argparse.ArgumentParser(description="Generate document Q&A pairs using MCP tool calls with BM25S search.")
511 subparsers = parser.add_subparsers(dest="command")
512
513 # 'serve' subcommand for running the MCP server
514 subparsers.add_parser("serve", help="Run the MCP server (used by Data Designer)")
515
516 # Default command arguments (demo mode)
517 parser.add_argument("--model-alias", type=str, default="nvidia-reasoning", help="Model alias to use for generation")
518 parser.add_argument("--num-records", type=int, default=4, help="Number of Q&A pairs to generate")
519 parser.add_argument(
520 "--pdf",
521 type=str,
522 action="append",
523 dest="pdfs",
524 metavar="PATH_OR_URL",
525 help="PDF file path or URL to index (can be specified multiple times). Defaults to a sample PDF if not provided.",
526 )
527 # For compatibility with Makefile test-run-recipes target (ignored in demo mode)
528 parser.add_argument("--artifact-path", type=str, default=None, help=argparse.SUPPRESS)
529
530 return parser.parse_args()
531
532
533def main() -> None:
534 """Main entry point for the demo."""
535 args = parse_args()
536
537 # Handle 'serve' subcommand
538 if args.command == "serve":
539 serve()
540 return
541
542 # Demo mode: run Data Designer with the BM25S MCP server
543 if os.environ.get("NVIDIA_API_KEY") is None and args.model_alias.startswith("nvidia"):
544 raise RuntimeError("NVIDIA_API_KEY must be set when using NVIDIA model aliases.")
545
546 # Use provided PDFs or fall back to default
547 pdf_sources = args.pdfs if args.pdfs else [DEFAULT_PDF_URL]
548
549 # Configure MCP provider to run via stdio transport (local subprocess)
550 mcp_provider = dd.LocalStdioMCPProvider(
551 name=MCP_SERVER_NAME,
552 command=sys.executable,
553 args=[str(Path(__file__).resolve()), "serve"],
554 env={"PDF_SOURCES": json.dumps(pdf_sources)},
555 )
556
557 config_builder = build_config(
558 model_alias=args.model_alias,
559 provider_name=MCP_SERVER_NAME,
560 )
561
562 preview_results = generate_preview(
563 config_builder=config_builder,
564 num_records=args.num_records,
565 mcp_provider=mcp_provider,
566 )
567
568 display_preview_record(preview_results)
569
570
571if __name__ == "__main__":
572 main()