nat.authentication.credential_validator.bearer_token_validator#

Attributes#

Classes#

BearerTokenValidator

Bearer token validator supporting JWT and opaque tokens.

Module Contents#

logger#
class BearerTokenValidator(
introspection_endpoint: str | None = None,
issuer: str | None = None,
audience: str | None = None,
jwks_uri: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
scopes: list[str] | None = None,
timeout: float = 10.0,
leeway: int = 60,
discovery_url: str | None = None,
)#

Bearer token validator supporting JWT and opaque tokens.

Implements RFC 7519 (JWT) and RFC 7662 (Token Introspection) standards.

Args:

introspection_endpoint: OAuth 2.0 introspection URL (required to validate opaque tokens). issuer: Expected token issuer (iss); recommended for policy, not required for JWT signature validity. audience: Expected token audience (aud); recommended for policy, not required for JWT signature validity. jwks_uri: JWKS URL with public keys to verify asymmetric JWTs; optional if using discovery. client_id: OAuth 2.0 client ID for authenticating to the introspection endpoint. client_secret: OAuth 2.0 client secret for authenticating to the introspection endpoint. scopes: Optional authorization scopes to check after validation; not required for token validity. timeout: HTTP request timeout for discovery/JWKS/introspection (default: 10.0s). leeway: Clock-skew allowance for exp/nbf/iat checks (default: 60s). discovery_url: OIDC/OAuth metadata URL to auto-discover jwks_uri and introspection_endpoint.

introspection_endpoint = None#
issuer = None#
audience = None#
jwks_uri = None#
client_id = None#
client_secret = None#
scopes = None#
timeout = 10.0#
leeway = 60#
discovery_url = None#
_jwks_cache: dict[str, dict[str, Any]]#
_oidc_config_cache: dict[str, dict[str, Any]]#
_introspection_cache: dict[str, dict[str, Any]]#
_jwks_cache_ttl = 900#
_discovery_cache_ttl = 900#
_validate_configuration() None#

Validate that at least one token verification method is configured.

async verify(
token: str,
) nat.data_models.authentication.TokenValidationResult#

Validate bearer token per RFC 7519 (JWT) and RFC 7662 (Introspection).

Args:

token: Bearer token to validate

Returns:

TokenValidationResult

_is_jwt_token(token: str) bool#

Check if token has JWT structure.

async _verify_jwt_token(
token: str,
) nat.data_models.authentication.TokenValidationResult#

Verify JWT token.

Args:

token: JWT token to verify

Returns:

TokenValidationResult

async _verify_opaque_token(
token: str,
) nat.data_models.authentication.TokenValidationResult#

Verify opaque token via RFC 7662 introspection.

Args:

token: Opaque token to verify

Returns:

TokenValidationResult

async _resolve_jwks_uri() str#

Resolve JWKS URI using configuration priority: jwks_uri → discovery → issuer.

Returns:

JWKS URI string

async _get_oidc_configuration(discovery_url: str) dict[str, Any]#

Get OIDC configuration.

Args:

discovery_url: OIDC discovery URL

Returns:

OIDC configuration dict

async _fetch_jwks(jwks_uri: str) authlib.jose.KeySet#

Fetch JWKS from URI.

Args:

jwks_uri: JWKS endpoint URI

Returns:

KeySet for token verification

_extract_audience_from_claims(
claims: dict[str, Any],
) list[str] | None#

Extract audience from JWT claims.

Args:

claims: JWT claims dict

Returns:

List of audience values

_extract_audience_from_introspection(
response: dict[str, Any],
) list[str] | None#

Extract audience from introspection response.

Args:

response: Introspection response dict

Returns:

List of audience values

_require_https(url: str, url_description: str) None#

Enforce HTTPS requirement.

Args:

url: URL to validate url_description: Description for error messages

_check_jwt_policies(
issuer_claim: str | None,
audience_claim: list[str] | None,
token_scopes: list[str] | None,
) None#

Check JWT token against configured policies.

Args:

issuer_claim: Issuer from JWT token audience_claim: Audience list from JWT token token_scopes: Scopes from JWT token

_check_opaque_policies(
issuer_claim: str | None,
audience_claim: list[str] | None,
token_scopes: list[str] | None,
) None#

Check opaque token against configured policies.

Args:

issuer_claim: Issuer from introspection response audience_claim: Audience list from introspection response token_scopes: Scopes from introspection response

_is_expired(exp: int | None, leeway: int | None = None) bool#

Check if timestamp is expired considering leeway.

Args:

exp: Expiration timestamp leeway: Clock skew allowance

Returns:

True if expired

_is_not_yet_valid(nbf: int | None, leeway: int | None = None) bool#

Check if timestamp is not yet valid considering leeway.

Args:

nbf: Not-before timestamp leeway: Clock skew allowance

Returns:

True if not yet valid