Files
crawl4ai/deploy/docker/tests/test_security_2026_04_b2.py
unclecode 0f20f8bb83 fix(security): batch 2 - JWT secret, eval removal, execute_js, hook sandbox
Fixes for 4 vulnerabilities reported by by111/August829 (2026-04-14):

1. Hardcoded JWT secret (CVSS 9.8): Removed "mysecret" default from
   auth.py. Added weak secret validation (blocklist + min 32 chars).
   Auto-generates ephemeral key when none set.

2. eval() in /config/dump (CVSS 9.1): Replaced eval-based config
   parsing with JSON input {type, params} validated by Pydantic.
   Added authentication. Deleted _safe_eval_config and all AST
   allowlist code.

3. /execute_js endpoint (CVSS 8.1): Disabled by default via
   CRAWL4AI_EXECUTE_JS_ENABLED env var. Added SSRF blocklist on
   destination URL. Removed --disable-web-security from default
   browser args.

4. Hook sandbox escape (CVSS 9.8): Strip __builtins__, __loader__,
   __spec__ from injected module proxies. Removed type, hasattr,
   __build_class__ from allowed builtins.

Also added SECURITY-CREDITS.md tracking all reporters.
30 adversarial tests added.

DO NOT PUSH until release day.
2026-04-15 05:42:14 +00:00

359 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Adversarial security tests for Batch 2 vulns reported 2026-04-14 (by111/August829).
Self-contained tests that verify fixes at the code/source level.
B2-V1: /execute_js disabled by default + SSRF block
B2-V2: Hardcoded JWT secret removed
B2-V3: eval() in /config/dump replaced with JSON
B2-V4: Hook sandbox __builtins__ escape fixed
"""
import os
import sys
import ast
import unittest
import builtins
import types
DEPLOY_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
# ============================================================================
# B2-V2: Hardcoded JWT Secret
# ============================================================================
class TestJWTSecretHardened(unittest.TestCase):
"""Verify the hardcoded 'mysecret' default is gone from auth.py."""
def test_no_mysecret_as_default(self):
"""auth.py must not use 'mysecret' as a fallback default for SECRET_KEY."""
with open(os.path.join(DEPLOY_DIR, "auth.py")) as f:
source = f.read()
# The old dangerous pattern: os.environ.get("SECRET_KEY", "mysecret")
self.assertNotIn('get("SECRET_KEY", "mysecret")', source,
"auth.py must not use 'mysecret' as env var default")
def test_weak_secret_validation_exists(self):
"""auth.py must validate against known weak secrets."""
with open(os.path.join(DEPLOY_DIR, "auth.py")) as f:
source = f.read()
self.assertIn("_WEAK_SECRETS", source,
"auth.py must have weak secrets blocklist")
self.assertIn("< 32", source,
"auth.py must enforce minimum key length")
def test_mysecret_in_weak_list(self):
"""'mysecret' must be in the weak secrets blocklist."""
with open(os.path.join(DEPLOY_DIR, "auth.py")) as f:
source = f.read()
# Parse the source to find _WEAK_SECRETS set
self.assertIn("mysecret", source,
"'mysecret' must be listed in _WEAK_SECRETS blocklist")
def test_auto_generation_exists(self):
"""auth.py must auto-generate key when none is set."""
with open(os.path.join(DEPLOY_DIR, "auth.py")) as f:
source = f.read()
self.assertIn("token_hex", source,
"auth.py must use secrets.token_hex for auto-generation")
# ============================================================================
# B2-V3: eval() removed from /config/dump
# ============================================================================
class TestConfigDumpNoEval(unittest.TestCase):
"""Verify eval() is completely removed from the /config/dump path."""
def test_no_safe_eval_config(self):
"""_safe_eval_config function must be removed from server.py."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
self.assertNotIn("def _safe_eval_config", source,
"_safe_eval_config must be deleted (replaced with JSON input)")
def test_config_from_json_exists(self):
"""_config_from_json function must exist."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
self.assertIn("def _config_from_json", source,
"_config_from_json must replace _safe_eval_config")
def test_config_dump_has_auth(self):
"""config_dump endpoint must require authentication."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
# Find the config_dump function and check it has token_dep
idx = source.index("config_dump")
# Look backwards for the decorator/function definition area
nearby = source[max(0, idx-200):idx+200]
self.assertIn("token_dep", nearby,
"/config/dump must require token_dep authentication")
def test_no_eval_in_config_path(self):
"""No eval() call should exist in the config dump code path."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
# The old allowlist constants should be gone
self.assertNotIn("_SAFE_CONFIG_ALLOWED_NAMES", source,
"Old eval allowlist constants should be removed")
self.assertNotIn("_SAFE_CONFIG_ALLOWED_ATTRS", source,
"Old eval allowlist constants should be removed")
# ============================================================================
# B2-V1: /execute_js disabled by default
# ============================================================================
class TestExecuteJsDisabled(unittest.TestCase):
"""Verify /execute_js is disabled by default with proper guards."""
def test_execute_js_flag_exists(self):
"""EXECUTE_JS_ENABLED flag must exist in server.py."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
self.assertIn("EXECUTE_JS_ENABLED", source)
def test_execute_js_disabled_by_default(self):
"""EXECUTE_JS_ENABLED must default to false."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
# Find the line that sets EXECUTE_JS_ENABLED
for line in source.splitlines():
if "EXECUTE_JS_ENABLED" in line and "os.environ" in line:
self.assertIn('"false"', line,
"EXECUTE_JS_ENABLED must default to 'false'")
return
self.fail("Could not find EXECUTE_JS_ENABLED env var line")
def test_execute_js_checks_flag(self):
"""execute_js endpoint must check EXECUTE_JS_ENABLED."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
idx = source.index("async def execute_js")
func_body = source[idx:idx+3000]
self.assertIn("EXECUTE_JS_ENABLED", func_body,
"execute_js must check EXECUTE_JS_ENABLED flag")
def test_execute_js_has_ssrf_check(self):
"""execute_js must validate URL against SSRF blocklist."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
idx = source.index("async def execute_js")
func_body = source[idx:idx+3000]
self.assertIn("validate_webhook_url", func_body,
"execute_js must validate URL against SSRF blocklist")
def test_disable_web_security_removed_from_defaults(self):
"""--disable-web-security must not be in default browser args."""
with open(os.path.join(DEPLOY_DIR, "utils.py")) as f:
source = f.read()
# Find the DEFAULT_CONFIG extra_args
tree = ast.parse(source)
for node in ast.walk(tree):
if isinstance(node, ast.Constant) and node.value == "--disable-web-security":
self.fail("--disable-web-security must not be in DEFAULT_CONFIG extra_args")
def test_disable_web_security_removed_from_config_yml(self):
"""--disable-web-security must not be active in config.yml."""
with open(os.path.join(DEPLOY_DIR, "config.yml")) as f:
for line in f:
stripped = line.strip()
if stripped == '- "--disable-web-security"':
self.fail("--disable-web-security must not be an active entry in config.yml")
# ============================================================================
# B2-V4: Hook Sandbox __builtins__ Escape
# ============================================================================
class TestHookSandboxBugreport(unittest.TestCase):
"""Test the specific __builtins__ escape vector reported by by111."""
@classmethod
def setUpClass(cls):
"""Build hook sandbox exactly as hook_manager does."""
safe_builtins = {}
allowed_builtins = [
'print', 'len', 'str', 'int', 'float', 'bool',
'list', 'dict', 'set', 'tuple', 'range', 'enumerate',
'zip', 'map', 'filter', 'any', 'all', 'sum', 'min', 'max',
'sorted', 'reversed', 'abs', 'round', 'isinstance',
'callable', 'iter', 'next',
]
for name in allowed_builtins:
if hasattr(builtins, name):
safe_builtins[name] = getattr(builtins, name)
cls.safe_builtins = safe_builtins
def _make_namespace(self):
import asyncio as _asyncio_mod
import json as _json_mod
import re as _re_mod
from typing import Dict, List, Optional
def _safe_module(mod, exclude_attrs=None):
proxy = types.ModuleType(mod.__name__)
skip = {"__builtins__", "__loader__", "__spec__"}
if exclude_attrs:
skip.update(exclude_attrs)
for attr in dir(mod):
if attr in skip:
continue
try:
setattr(proxy, attr, getattr(mod, attr))
except (AttributeError, TypeError):
pass
return proxy
namespace = {
'__name__': 'test_hook',
'__builtins__': dict(self.safe_builtins),
}
namespace["asyncio"] = _safe_module(_asyncio_mod, {
"subprocess", "create_subprocess_exec", "create_subprocess_shell"
})
namespace["json"] = _safe_module(_json_mod)
namespace["re"] = _safe_module(_re_mod)
namespace["Dict"] = Dict
namespace["List"] = List
namespace["Optional"] = Optional
return namespace
# -- The exact attack from by111's report --
def test_asyncio_builtins_import_blocked(self):
"""asyncio.__builtins__['__import__'] must not be accessible."""
ns = self._make_namespace()
self.assertFalse(hasattr(ns["asyncio"], "__builtins__"),
"asyncio proxy must not have __builtins__")
def test_json_builtins_import_blocked(self):
"""json.__builtins__['__import__'] must not be accessible."""
ns = self._make_namespace()
self.assertFalse(hasattr(ns["json"], "__builtins__"),
"json proxy must not have __builtins__")
def test_re_builtins_import_blocked(self):
"""re.__builtins__['__import__'] must not be accessible."""
ns = self._make_namespace()
self.assertFalse(hasattr(ns["re"], "__builtins__"),
"re proxy must not have __builtins__")
def test_module_loader_not_copied(self):
"""Real module's __loader__ must not be copied to proxy."""
import asyncio as real_asyncio
ns = self._make_namespace()
# Proxy may have a default __loader__ from types.ModuleType,
# but it must NOT be the real module's loader
proxy_loader = getattr(ns["asyncio"], "__loader__", None)
real_loader = getattr(real_asyncio, "__loader__", None)
if proxy_loader is not None and real_loader is not None:
self.assertIsNot(proxy_loader, real_loader,
"Proxy must not have the real module's __loader__")
def test_module_spec_not_copied(self):
"""Real module's __spec__ must not be copied to proxy."""
import asyncio as real_asyncio
ns = self._make_namespace()
proxy_spec = getattr(ns["asyncio"], "__spec__", None)
real_spec = getattr(real_asyncio, "__spec__", None)
if proxy_spec is not None and real_spec is not None:
self.assertIsNot(proxy_spec, real_spec,
"Proxy must not have the real module's __spec__")
def test_by111_exploit_via_asyncio(self):
"""Exact exploit from by111: asyncio.__builtins__['__import__']('os')."""
ns = self._make_namespace()
code = '''
async def hook(page, **kw):
real_import = asyncio.__builtins__['__import__']
os = real_import('os')
return os.system('id')
'''
with self.assertRaises((AttributeError, KeyError, TypeError)):
exec(code, ns)
import asyncio
asyncio.get_event_loop().run_until_complete(ns['hook'](None))
def test_getattr_not_in_builtins(self):
"""getattr must not be available (enables attribute-based escape)."""
ns = self._make_namespace()
self.assertNotIn('getattr', ns['__builtins__'])
def test_type_not_in_builtins(self):
"""type must not be available (enables __subclasses__ MRO chain)."""
ns = self._make_namespace()
self.assertNotIn('type', ns['__builtins__'])
def test_build_class_not_in_builtins(self):
"""__build_class__ must not be available."""
ns = self._make_namespace()
self.assertNotIn('__build_class__', ns['__builtins__'])
def test_hasattr_not_in_builtins(self):
"""hasattr must not be available (information disclosure)."""
ns = self._make_namespace()
self.assertNotIn('hasattr', ns['__builtins__'])
# -- asyncio still works for legitimate hooks --
def test_asyncio_sleep_works(self):
ns = self._make_namespace()
self.assertTrue(hasattr(ns["asyncio"], "sleep"))
def test_asyncio_gather_works(self):
ns = self._make_namespace()
self.assertTrue(hasattr(ns["asyncio"], "gather"))
def test_json_loads_works(self):
ns = self._make_namespace()
self.assertTrue(hasattr(ns["json"], "loads"))
def test_re_compile_works(self):
ns = self._make_namespace()
self.assertTrue(hasattr(ns["re"], "compile"))
# ============================================================================
# Source-level verification for hook_manager.py
# ============================================================================
class TestHookManagerSourceClean(unittest.TestCase):
"""Verify hook_manager.py source has all dangerous builtins removed."""
def test_getattr_removed(self):
with open(os.path.join(DEPLOY_DIR, "hook_manager.py")) as f:
source = f.read()
tree = ast.parse(source)
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "allowed_builtins":
if isinstance(node.value, ast.List):
vals = [e.value for e in node.value.elts if isinstance(e, ast.Constant)]
self.assertNotIn("getattr", vals)
self.assertNotIn("setattr", vals)
self.assertNotIn("hasattr", vals)
self.assertNotIn("type", vals)
self.assertNotIn("__build_class__", vals)
return
self.fail("Could not find allowed_builtins in hook_manager.py")
def test_safe_module_strips_builtins(self):
"""_safe_module function must skip __builtins__."""
with open(os.path.join(DEPLOY_DIR, "hook_manager.py")) as f:
source = f.read()
self.assertIn("__builtins__", source)
self.assertIn("__loader__", source)
self.assertIn("__spec__", source)
if __name__ == "__main__":
print("=" * 70)
print("Crawl4AI Security Tests - Batch 2 (2026-04-14)")
print("=" * 70)
print()
unittest.main(verbosity=2)