mirror of
https://github.com/unclecode/crawl4ai.git
synced 2026-06-10 15:58:15 +00:00
Fixes for 4 vulnerabilities reported by by111/August829 (2026-04-14):
1. Hardcoded JWT secret (CVSS 9.8): Removed "mysecret" default from
auth.py. Added weak secret validation (blocklist + min 32 chars).
Auto-generates ephemeral key when none set.
2. eval() in /config/dump (CVSS 9.1): Replaced eval-based config
parsing with JSON input {type, params} validated by Pydantic.
Added authentication. Deleted _safe_eval_config and all AST
allowlist code.
3. /execute_js endpoint (CVSS 8.1): Disabled by default via
CRAWL4AI_EXECUTE_JS_ENABLED env var. Added SSRF blocklist on
destination URL. Removed --disable-web-security from default
browser args.
4. Hook sandbox escape (CVSS 9.8): Strip __builtins__, __loader__,
__spec__ from injected module proxies. Removed type, hasattr,
__build_class__ from allowed builtins.
Also added SECURITY-CREDITS.md tracking all reporters.
30 adversarial tests added.
DO NOT PUSH until release day.
104 lines
3.9 KiB
Python
104 lines
3.9 KiB
Python
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Dict, Optional
|
|
from jwt import JWT, jwk_from_dict
|
|
from jwt.utils import get_int_from_datetime
|
|
from fastapi import Depends, HTTPException
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import EmailStr
|
|
from pydantic.main import BaseModel
|
|
import base64
|
|
|
|
instance = JWT()
|
|
security = HTTPBearer(auto_error=False)
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
|
|
|
_WEAK_SECRETS = {"mysecret", "secret", "password", "changeme", "test", "12345678"}
|
|
|
|
|
|
def _resolve_secret_key() -> str:
|
|
"""Resolve SECRET_KEY: validate if set, auto-generate if JWT enabled but unset."""
|
|
import logging
|
|
import secrets as _secrets
|
|
key = os.environ.get("SECRET_KEY", "")
|
|
if key:
|
|
if key.lower() in _WEAK_SECRETS:
|
|
raise RuntimeError(
|
|
"FATAL: SECRET_KEY is a known weak value. "
|
|
"Generate a strong one: python3 -c \"import secrets; print(secrets.token_hex(32))\""
|
|
)
|
|
if len(key) < 32:
|
|
raise RuntimeError(
|
|
"FATAL: SECRET_KEY must be at least 32 characters. "
|
|
"Generate one: python3 -c \"import secrets; print(secrets.token_hex(32))\""
|
|
)
|
|
return key
|
|
# No key set -- auto-generate ephemeral key
|
|
generated = _secrets.token_hex(32)
|
|
logging.getLogger("crawl4ai.security").warning(
|
|
"No SECRET_KEY set. Auto-generated ephemeral key (changes on restart). "
|
|
"Set SECRET_KEY env var for production."
|
|
)
|
|
return generated
|
|
|
|
|
|
SECRET_KEY = _resolve_secret_key()
|
|
|
|
def get_jwk_from_secret(secret: str):
|
|
"""Convert a secret string into a JWK object."""
|
|
secret_bytes = secret.encode('utf-8')
|
|
b64_secret = base64.urlsafe_b64encode(secret_bytes).rstrip(b'=').decode('utf-8')
|
|
return jwk_from_dict({"kty": "oct", "k": b64_secret})
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Create a JWT access token with an expiration."""
|
|
to_encode = data.copy()
|
|
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
|
|
to_encode.update({"exp": get_int_from_datetime(expire)})
|
|
signing_key = get_jwk_from_secret(SECRET_KEY)
|
|
return instance.encode(to_encode, signing_key, alg='HS256')
|
|
|
|
def verify_token(credentials: HTTPAuthorizationCredentials) -> Dict:
|
|
"""Verify the JWT token from the Authorization header."""
|
|
|
|
if not credentials or not credentials.credentials:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="No token provided",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
|
|
token = credentials.credentials
|
|
verifying_key = get_jwk_from_secret(SECRET_KEY)
|
|
try:
|
|
payload = instance.decode(token, verifying_key, do_time_check=True, algorithms='HS256')
|
|
return payload
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail=f"Invalid or expired token: {str(e)}",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
|
|
|
|
def get_token_dependency(config: Dict):
|
|
"""Return the token dependency if JWT is enabled, else a function that returns None."""
|
|
|
|
if config.get("security", {}).get("jwt_enabled", False):
|
|
def jwt_required(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict:
|
|
"""Enforce JWT authentication when enabled."""
|
|
if credentials is None:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Authentication required. Please provide a valid Bearer token.",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
return verify_token(credentials)
|
|
return jwt_required
|
|
else:
|
|
return lambda: None
|
|
|
|
|
|
class TokenRequest(BaseModel):
|
|
email: EmailStr
|
|
api_token: Optional[str] = None |