Source code for nv_ingest_client.util.zipkin
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import json
import logging
import os
from typing import Dict, Any
from typing import List
from typing import Optional
import httpx
logger = logging.getLogger(__name__)
[docs]
class AsyncZipkinClient:
def __init__(self, host: str, port: int, concurrent_requests: int, max_retries: int = 10, retry_delay: int = 5):
if host.startswith("http"):
self._host = host
else:
logger.debug("Defaulting to http:// for Zipkin host protocol")
self._host = "http://" + host
self._port = port
self._concurrent_requests = concurrent_requests
self._max_retries = max_retries
self._retry_delay = retry_delay
[docs]
async def fetch(self, sem, trace_id: str, url: str) -> Dict[str, str]:
"""
Perform a GET request to the given URL with retry logic for 404 status codes.
:param url: URL to make the GET request to.
:param sem: Semaphore to ensure only X concurrent requests are in flight
:param trace_id: The trace_id for the request
:param url: The complete URL for the request.
:return: Dict[str, str] Containing trace_id and JSON str response
:raises: RuntimeError if the maximum retries are exceeded.
"""
attempt = 0
while attempt < self._max_retries:
timeout = httpx.Timeout(10.0)
async with sem:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=timeout)
if response.status_code == 404:
attempt += 1
logger.info(
f"Attempt {attempt}/{self._max_retries} for trace_id: {trace_id} failed with 404. "
f"Retrying in {self._retry_delay} seconds..."
)
await asyncio.sleep(self._retry_delay)
else:
return {"trace_id": trace_id, "json": response.text}
raise RuntimeError(f"Max retries exceeded for URL: {url}")
[docs]
async def get_metrics(self, trace_ids: List[str]):
urls = []
for trace_id in trace_ids:
logger.debug(f"Trace-ID in URL: {trace_id}")
urls.append((trace_id, f"{self._host}:{self._port}/api/v2/trace/{trace_id}"))
sem = asyncio.Semaphore(self._concurrent_requests)
tasks = [self.fetch(sem, url[0], url[1]) for url in urls]
responses = await asyncio.gather(*tasks)
return responses
[docs]
def collect_traces_from_zipkin(
zipkin_host: str, zipkin_port: int, trace_id_map: Dict[str, str], concurrent_requests: Optional[int] = 1
) -> list[Any]:
zipkin_client = AsyncZipkinClient(zipkin_host, zipkin_port, concurrent_requests)
# Take the Dictionary of filenames -> trace_ids and build just a list of trace_ids to send to Zipkin
trace_ids = []
for filename in trace_id_map.keys():
trace_ids.append(trace_id_map[filename].replace("-", ""))
traces = asyncio.run(zipkin_client.get_metrics(trace_ids=trace_ids))
return traces
[docs]
def write_results_to_output_directory(
output_directory: str,
trace_responses: List[Dict[str, str]],
) -> None:
logger.info(f"Writing {len(trace_responses)} to output_directory: {output_directory}")
# Check if the output directory exists; if not, create it
if not os.path.exists(output_directory):
os.makedirs(output_directory)
logger.info(f"Created directory: {output_directory}")
# Define the subdirectory path
zipkin_profiles_directory = os.path.join(output_directory, "zipkin_profiles")
# Ensure the subdirectory exists
if not os.path.exists(zipkin_profiles_directory):
os.makedirs(zipkin_profiles_directory)
logger.debug(f"Created subdirectory: {zipkin_profiles_directory}")
# For each input file, create an output file with its profile data
for trace in trace_responses:
with open(f"{zipkin_profiles_directory}/{trace['trace_id']}.json", "w") as trace_file:
trace_file.write(json.dumps(trace["json"]))
# Write all of the combined profile data to a single file
with open(f"{zipkin_profiles_directory}/combined.json", "w") as combined_file:
combined_file.write(json.dumps(trace_responses))