Files
crawl4ai/deploy/docker/auth.py
unclecode 7c0cc3ed88 fix: batch merge of community PRs (#1622, #1786, #1796, #1795, #1798, #1734, #1290, #1668)
Bug fixes:
- Verify redirect targets are alive before returning from URL seeder (#1622)
- Wire mean_delay/max_range from CrawlerRunConfig into dispatcher rate limiter (#1786)
- Use DOMParser instead of innerHTML in process_iframes to prevent XSS (#1796)

Security/Docker:
- Require api_token for /token endpoint when configured (#1795)
- Deep-crawl streaming now mirrors Python library behavior via arun() (#1798)

CI:
- Bump GitHub Actions to latest versions - checkout v6, setup-python v6,
  build-push-action v6, setup-buildx v4, login v4 (#1734)

Features:
- Support type-list pipeline in JsonCssExtractionStrategy for chained
  extraction like ["attribute", "regex"] (#1290)
- Add --json-ensure-ascii CLI flag and JSON_ENSURE_ASCII config setting
  for Unicode preservation in JSON output (#1668)
2026-03-07 08:45:11 +00:00

74 lines
2.8 KiB
Python

import os
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional
from jwt import JWT, jwk_from_dict
from jwt.utils import get_int_from_datetime
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import EmailStr
from pydantic.main import BaseModel
import base64
instance = JWT()
security = HTTPBearer(auto_error=False)
SECRET_KEY = os.environ.get("SECRET_KEY", "mysecret")
ACCESS_TOKEN_EXPIRE_MINUTES = 60
def get_jwk_from_secret(secret: str):
"""Convert a secret string into a JWK object."""
secret_bytes = secret.encode('utf-8')
b64_secret = base64.urlsafe_b64encode(secret_bytes).rstrip(b'=').decode('utf-8')
return jwk_from_dict({"kty": "oct", "k": b64_secret})
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token with an expiration."""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": get_int_from_datetime(expire)})
signing_key = get_jwk_from_secret(SECRET_KEY)
return instance.encode(to_encode, signing_key, alg='HS256')
def verify_token(credentials: HTTPAuthorizationCredentials) -> Dict:
"""Verify the JWT token from the Authorization header."""
if not credentials or not credentials.credentials:
raise HTTPException(
status_code=401,
detail="No token provided",
headers={"WWW-Authenticate": "Bearer"}
)
token = credentials.credentials
verifying_key = get_jwk_from_secret(SECRET_KEY)
try:
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
return payload
except Exception as e:
raise HTTPException(
status_code=401,
detail=f"Invalid or expired token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"}
)
def get_token_dependency(config: Dict):
"""Return the token dependency if JWT is enabled, else a function that returns None."""
if config.get("security", {}).get("jwt_enabled", False):
def jwt_required(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
"""Enforce JWT authentication when enabled."""
if credentials is None:
raise HTTPException(
status_code=401,
detail="Authentication required. Please provide a valid Bearer token.",
headers={"WWW-Authenticate": "Bearer"}
)
return verify_token(credentials)
return jwt_required
else:
return lambda: None
class TokenRequest(BaseModel):
email: EmailStr
api_token: Optional[str] = None