Files
crawl4ai/deploy/docker/auth.py
unclecode 0f20f8bb83 fix(security): batch 2 - JWT secret, eval removal, execute_js, hook sandbox
Fixes for 4 vulnerabilities reported by by111/August829 (2026-04-14):

1. Hardcoded JWT secret (CVSS 9.8): Removed "mysecret" default from
   auth.py. Added weak secret validation (blocklist + min 32 chars).
   Auto-generates ephemeral key when none set.

2. eval() in /config/dump (CVSS 9.1): Replaced eval-based config
   parsing with JSON input {type, params} validated by Pydantic.
   Added authentication. Deleted _safe_eval_config and all AST
   allowlist code.

3. /execute_js endpoint (CVSS 8.1): Disabled by default via
   CRAWL4AI_EXECUTE_JS_ENABLED env var. Added SSRF blocklist on
   destination URL. Removed --disable-web-security from default
   browser args.

4. Hook sandbox escape (CVSS 9.8): Strip __builtins__, __loader__,
   __spec__ from injected module proxies. Removed type, hasattr,
   __build_class__ from allowed builtins.

Also added SECURITY-CREDITS.md tracking all reporters.
30 adversarial tests added.

DO NOT PUSH until release day.
2026-04-15 05:42:14 +00:00

104 lines
3.9 KiB
Python

import os
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional
from jwt import JWT, jwk_from_dict
from jwt.utils import get_int_from_datetime
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import EmailStr
from pydantic.main import BaseModel
import base64
instance = JWT()
security = HTTPBearer(auto_error=False)
ACCESS_TOKEN_EXPIRE_MINUTES = 60
_WEAK_SECRETS = {"mysecret", "secret", "password", "changeme", "test", "12345678"}
def _resolve_secret_key() -> str:
"""Resolve SECRET_KEY: validate if set, auto-generate if JWT enabled but unset."""
import logging
import secrets as _secrets
key = os.environ.get("SECRET_KEY", "")
if key:
if key.lower() in _WEAK_SECRETS:
raise RuntimeError(
"FATAL: SECRET_KEY is a known weak value. "
"Generate a strong one: python3 -c \"import secrets; print(secrets.token_hex(32))\""
)
if len(key) < 32:
raise RuntimeError(
"FATAL: SECRET_KEY must be at least 32 characters. "
"Generate one: python3 -c \"import secrets; print(secrets.token_hex(32))\""
)
return key
# No key set -- auto-generate ephemeral key
generated = _secrets.token_hex(32)
logging.getLogger("crawl4ai.security").warning(
"No SECRET_KEY set. Auto-generated ephemeral key (changes on restart). "
"Set SECRET_KEY env var for production."
)
return generated
SECRET_KEY = _resolve_secret_key()
def get_jwk_from_secret(secret: str):
"""Convert a secret string into a JWK object."""
secret_bytes = secret.encode('utf-8')
b64_secret = base64.urlsafe_b64encode(secret_bytes).rstrip(b'=').decode('utf-8')
return jwk_from_dict({"kty": "oct", "k": b64_secret})
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token with an expiration."""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": get_int_from_datetime(expire)})
signing_key = get_jwk_from_secret(SECRET_KEY)
return instance.encode(to_encode, signing_key, alg='HS256')
def verify_token(credentials: HTTPAuthorizationCredentials) -> Dict:
"""Verify the JWT token from the Authorization header."""
if not credentials or not credentials.credentials:
raise HTTPException(
status_code=401,
detail="No token provided",
headers={"WWW-Authenticate": "Bearer"}
)
token = credentials.credentials
verifying_key = get_jwk_from_secret(SECRET_KEY)
try:
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
return payload
except Exception as e:
raise HTTPException(
status_code=401,
detail=f"Invalid or expired token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"}
)
def get_token_dependency(config: Dict):
"""Return the token dependency if JWT is enabled, else a function that returns None."""
if config.get("security", {}).get("jwt_enabled", False):
def jwt_required(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
"""Enforce JWT authentication when enabled."""
if credentials is None:
raise HTTPException(
status_code=401,
detail="Authentication required. Please provide a valid Bearer token.",
headers={"WWW-Authenticate": "Bearer"}
)
return verify_token(credentials)
return jwt_required
else:
return lambda: None
class TokenRequest(BaseModel):
email: EmailStr
api_token: Optional[str] = None