diff --git a/crawl4ai/extraction_strategy.py b/crawl4ai/extraction_strategy.py index 2c736258..494ddc07 100644 --- a/crawl4ai/extraction_strategy.py +++ b/crawl4ai/extraction_strategy.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -import ast import inspect from typing import Any, List, Dict, Optional, Tuple, Pattern, Union from concurrent.futures import ThreadPoolExecutor, as_completed @@ -1040,67 +1039,6 @@ class LLMExtractionStrategy(ExtractionStrategy): # New extraction strategies for JSON-based extraction # ####################################################### -# Safe builtins allowed in computed field expressions -_SAFE_EVAL_BUILTINS = { - "str": str, "int": int, "float": float, "bool": bool, - "len": len, "round": round, "abs": abs, "min": min, "max": max, - "sum": sum, "sorted": sorted, "reversed": reversed, - "list": list, "dict": dict, "tuple": tuple, "set": set, - "enumerate": enumerate, "zip": zip, "map": map, "filter": filter, - "any": any, "all": all, "range": range, - "True": True, "False": False, "None": None, - "isinstance": isinstance, "type": type, -} - - -def _safe_eval_expression(expression: str, local_vars: dict) -> Any: - """ - Evaluate a computed field expression safely using AST validation. - - Allows simple transforms (math, string methods, attribute access on data) - while blocking dangerous operations (__import__, dunder access, etc.). - - Args: - expression: The Python expression string to evaluate. - local_vars: The local variables (extracted item fields) available to the expression. - - Returns: - The result of evaluating the expression. - - Raises: - ValueError: If the expression contains disallowed constructs. - """ - try: - tree = ast.parse(expression, mode="eval") - except SyntaxError as e: - raise ValueError(f"Invalid expression syntax: {e}") - - for node in ast.walk(tree): - # Block import statements - if isinstance(node, (ast.Import, ast.ImportFrom)): - raise ValueError("Import statements are not allowed in expressions") - - # Block attribute access to dunder attributes (e.g., __class__, __globals__) - if isinstance(node, ast.Attribute) and node.attr.startswith("_"): - raise ValueError( - f"Access to private/dunder attribute '{node.attr}' is not allowed" - ) - - # Block calls to __import__ or any name starting with _ - if isinstance(node, ast.Call): - func = node.func - if isinstance(func, ast.Name) and func.id.startswith("_"): - raise ValueError( - f"Calling '{func.id}' is not allowed in expressions" - ) - if isinstance(func, ast.Attribute) and func.attr.startswith("_"): - raise ValueError( - f"Calling '{func.attr}' is not allowed in expressions" - ) - - safe_globals = {"__builtins__": _SAFE_EVAL_BUILTINS} - return eval(compile(tree, "", "eval"), safe_globals, local_vars) - class JsonElementExtractionStrategy(ExtractionStrategy): """ diff --git a/deploy/docker/hook_manager.py b/deploy/docker/hook_manager.py index 1e79e083..4cf26109 100644 --- a/deploy/docker/hook_manager.py +++ b/deploy/docker/hook_manager.py @@ -142,12 +142,35 @@ class UserHookManager: '__builtins__': safe_builtins } - # Add commonly needed imports - exec("import asyncio", namespace) - exec("import json", namespace) - exec("import re", namespace) - exec("from typing import Dict, List, Optional", namespace) - + # Inject commonly needed modules into hook namespace. + # NOTE: We import here (in hook_manager's own scope) and inject + # the objects directly. Using exec("import X", namespace) does NOT + # work because namespace's __builtins__ lacks __import__. + import asyncio as _asyncio_mod + import json as _json_mod + import re as _re_mod + from typing import Dict as _Dict, List as _List, Optional as _Optional + + # SECURITY: Sanitize asyncio to prevent RCE via + # asyncio.subprocess.create_subprocess_shell() etc. + # Hooks need asyncio for sleep/gather/etc but NOT subprocess. + import types + safe_asyncio = types.ModuleType("asyncio") + for _attr in dir(_asyncio_mod): + if _attr not in ("subprocess", "create_subprocess_exec", + "create_subprocess_shell"): + try: + setattr(safe_asyncio, _attr, getattr(_asyncio_mod, _attr)) + except (AttributeError, TypeError): + pass + + namespace["asyncio"] = safe_asyncio + namespace["json"] = _json_mod + namespace["re"] = _re_mod + namespace["Dict"] = _Dict + namespace["List"] = _List + namespace["Optional"] = _Optional + # Execute the code to define the function exec(hook_code, namespace) diff --git a/deploy/docker/server.py b/deploy/docker/server.py index 816d7516..b027ee1f 100644 --- a/deploy/docker/server.py +++ b/deploy/docker/server.py @@ -84,6 +84,15 @@ GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES) # Hooks are disabled by default for security (RCE risk). Set to "true" to enable. HOOKS_ENABLED = os.environ.get("CRAWL4AI_HOOKS_ENABLED", "false").lower() == "true" +# Warn loudly if API token is not set (all endpoints unauthenticated) +_api_token = config.get("security", {}).get("api_token", "") or os.environ.get("CRAWL4AI_API_TOKEN", "") +if not _api_token: + import logging as _logging + _logging.getLogger("crawl4ai.security").warning( + "CRAWL4AI_API_TOKEN is not set. All API endpoints are unauthenticated. " + "Set CRAWL4AI_API_TOKEN environment variable to enable authentication." + ) + # ── default browser config helper ───────────────────────────── def get_default_browser_config() -> BrowserConfig: """Get default BrowserConfig from config.yml.""" diff --git a/tests/test_eval_security_adversarial.py b/tests/test_eval_security_adversarial.py index ac75506d..8539b364 100644 --- a/tests/test_eval_security_adversarial.py +++ b/tests/test_eval_security_adversarial.py @@ -376,48 +376,29 @@ class TestSafeEvalConfigAdversarial(unittest.TestCase): # ============================================================================ -# PART 3: _safe_eval_expression function (still exists but unused from compute_field) -# Verify it still has blocklist protections in case anyone calls it directly +# PART 3: _safe_eval_expression DELETED +# The function and _SAFE_EVAL_BUILTINS were removed from extraction_strategy.py. +# Dead security-sensitive code is a liability. # ============================================================================ -class TestSafeEvalExpressionFunction(unittest.TestCase): - """Test the _safe_eval_expression function directly. - Even though _compute_field no longer calls it, the function still exists - and should still block dangerous patterns if called.""" +class TestSafeEvalExpressionDeleted(unittest.TestCase): + """Verify _safe_eval_expression is gone from the codebase.""" - @classmethod - def setUpClass(cls): - from crawl4ai.extraction_strategy import _safe_eval_expression - cls.safe_eval = staticmethod(_safe_eval_expression) + def test_function_not_importable(self): + """_safe_eval_expression must not exist in extraction_strategy.""" + from crawl4ai import extraction_strategy + self.assertFalse( + hasattr(extraction_strategy, '_safe_eval_expression'), + "_safe_eval_expression should be deleted - dead security code is a liability" + ) - # -- Must block -- - - def test_import_blocked(self): - with self.assertRaises(ValueError): - self.safe_eval("__import__('os').system('id')", {}) - - def test_dunder_attribute_blocked(self): - with self.assertRaises(ValueError): - self.safe_eval("''.__class__.__bases__", {}) - - def test_dunder_method_call_blocked(self): - with self.assertRaises(ValueError): - self.safe_eval("x.__getattribute__('y')", {"x": {}}) - - def test_private_attribute_blocked(self): - with self.assertRaises(ValueError): - self.safe_eval("x._private_method()", {"x": {}}) - - # -- Must work -- - - def test_simple_math(self): - self.assertAlmostEqual(self.safe_eval("price * 1.1", {"price": 100}), 110.0) - - def test_string_method(self): - self.assertEqual(self.safe_eval("name.upper()", {"name": "hello"}), "HELLO") - - def test_builtin_len(self): - self.assertEqual(self.safe_eval("len(name)", {"name": "hello"}), 5) + def test_safe_eval_builtins_not_importable(self): + """_SAFE_EVAL_BUILTINS must not exist in extraction_strategy.""" + from crawl4ai import extraction_strategy + self.assertFalse( + hasattr(extraction_strategy, '_SAFE_EVAL_BUILTINS'), + "_SAFE_EVAL_BUILTINS should be deleted along with _safe_eval_expression" + ) # ============================================================================ @@ -467,7 +448,7 @@ class TestNoUnprotectedEval(unittest.TestCase): def _scan_python_files(self, directory, exclude_dirs=None): """Find all eval()/exec() calls in Python files.""" - exclude_dirs = exclude_dirs or {"__pycache__", ".git", "node_modules", "venv", ".venv"} + exclude_dirs = exclude_dirs or {"__pycache__", ".git", "node_modules", "venv", ".venv", "build", "dist", ".eggs"} hits = [] for root, dirs, files in os.walk(directory): dirs[:] = [d for d in dirs if d not in exclude_dirs] @@ -492,15 +473,13 @@ class TestNoUnprotectedEval(unittest.TestCase): """Every eval/exec in the repo must be in the known-safe list.""" repo_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") - # Known, audited locations (file suffix, line range, call type) + # Known, audited locations (file suffix, call type) known_safe = { - # extraction_strategy.py _safe_eval_expression - blocklisted, no longer called from untrusted path - ("crawl4ai/extraction_strategy.py", "eval"), # server.py _safe_eval_config - hardened with allowlist ("deploy/docker/server.py", "eval"), # hook_manager.py - restricted namespace, hooks gated behind env var ("deploy/docker/hook_manager.py", "exec"), - # test files are fine + # NOTE: extraction_strategy.py eval was DELETED, not just disabled } hits = self._scan_python_files(repo_root) @@ -526,6 +505,326 @@ class TestNoUnprotectedEval(unittest.TestCase): ) +# ============================================================================ +# PART 6: Hook manager sandbox escape tests +# ============================================================================ + +class TestHookManagerSandboxEscapes(unittest.TestCase): + """Try every trick to escape the hook_manager exec() sandbox. + Hooks are the most dangerous surface: exec() on user-supplied code.""" + + @classmethod + def setUpClass(cls): + """Build the hook sandbox exactly as hook_manager.py does.""" + import builtins + import types + + safe_builtins = {} + allowed_builtins = [ + 'print', 'len', 'str', 'int', 'float', 'bool', + 'list', 'dict', 'set', 'tuple', 'range', 'enumerate', + 'zip', 'map', 'filter', 'any', 'all', 'sum', 'min', 'max', + 'sorted', 'reversed', 'abs', 'round', 'isinstance', 'type', + 'hasattr', 'callable', 'iter', 'next', + '__build_class__' + ] + for name in allowed_builtins: + if hasattr(builtins, name): + safe_builtins[name] = getattr(builtins, name) + + cls.safe_builtins = safe_builtins + + def _make_namespace(self): + """Create a fresh hook namespace with sanitized imports (as hook_manager does). + Mirrors the actual hook_manager.py injection approach: import in our scope, + sanitize, then inject into namespace. exec("import X", ns) doesn't work + because ns lacks __import__.""" + import asyncio as _asyncio_mod + import json as _json_mod + import re as _re_mod + import types + from typing import Dict, List, Optional + + namespace = { + '__name__': 'test_hook', + '__builtins__': dict(self.safe_builtins), + } + + # Sanitize asyncio: strip subprocess access + safe_asyncio = types.ModuleType("asyncio") + for attr in dir(_asyncio_mod): + if attr not in ("subprocess", "create_subprocess_exec", + "create_subprocess_shell"): + try: + setattr(safe_asyncio, attr, getattr(_asyncio_mod, attr)) + except (AttributeError, TypeError): + pass + + namespace["asyncio"] = safe_asyncio + namespace["json"] = _json_mod + namespace["re"] = _re_mod + namespace["Dict"] = Dict + namespace["List"] = List + namespace["Optional"] = Optional + + return namespace + + def _exec_hook(self, code): + """Execute hook code in sandbox, return namespace.""" + ns = self._make_namespace() + exec(code, ns) + return ns + + # -- The original RCE that was proven exploitable -- + + def test_asyncio_subprocess_blocked(self): + """asyncio.subprocess must not be accessible (was RCE vector).""" + ns = self._make_namespace() + self.assertFalse( + hasattr(ns["asyncio"], "subprocess"), + "asyncio.subprocess must be stripped from hook namespace" + ) + + def test_asyncio_create_subprocess_shell_blocked(self): + """asyncio.create_subprocess_shell must not be accessible.""" + ns = self._make_namespace() + self.assertFalse( + hasattr(ns["asyncio"], "create_subprocess_shell"), + "asyncio.create_subprocess_shell must be stripped" + ) + + def test_asyncio_create_subprocess_exec_blocked(self): + """asyncio.create_subprocess_exec must not be accessible.""" + ns = self._make_namespace() + self.assertFalse( + hasattr(ns["asyncio"], "create_subprocess_exec"), + "asyncio.create_subprocess_exec must be stripped" + ) + + def test_asyncio_subprocess_rce_attempt(self): + """Actually try the RCE via asyncio.subprocess -- must fail.""" + code = ''' +async def evil(page, ctx): + sp = asyncio.subprocess + proc = await sp.create_subprocess_shell('id', stdout=sp.PIPE) + out, _ = await proc.communicate() + return out.decode() +''' + with self.assertRaises(AttributeError): + ns = self._exec_hook(code) + import asyncio + asyncio.get_event_loop().run_until_complete(ns['evil'](None, None)) + + # -- asyncio useful functions still work -- + + def test_asyncio_sleep_still_works(self): + """asyncio.sleep must still be available for hooks.""" + ns = self._make_namespace() + self.assertTrue(hasattr(ns["asyncio"], "sleep")) + + def test_asyncio_gather_still_works(self): + """asyncio.gather must still be available.""" + ns = self._make_namespace() + self.assertTrue(hasattr(ns["asyncio"], "gather")) + + def test_asyncio_event_still_works(self): + """asyncio.Event must still be available.""" + ns = self._make_namespace() + self.assertTrue(hasattr(ns["asyncio"], "Event")) + + # -- Try importing os/subprocess directly -- + + def test_import_os_blocked(self): + """Direct 'import os' must fail (no __import__).""" + with self.assertRaises(ImportError): + self._exec_hook("import os") + + def test_import_subprocess_blocked(self): + with self.assertRaises(ImportError): + self._exec_hook("import subprocess") + + def test_import_sys_blocked(self): + with self.assertRaises(ImportError): + self._exec_hook("import sys") + + # -- Try __import__ smuggling -- + + def test_dunder_import_not_available(self): + """__import__ must not be in builtins.""" + ns = self._make_namespace() + self.assertNotIn('__import__', ns['__builtins__']) + + def test_builtins_import_via_type(self): + """type().__bases__ subclass scanning can list classes but can't get __import__.""" + ns = self._exec_hook(""" +result = [c.__name__ for c in type.__bases__[0].__subclasses__()[:5]] +""") + # The subclass list is accessible, but without __import__ in builtins + # there's no path to import os/subprocess for RCE + self.assertNotIn('__import__', ns['__builtins__']) + + # -- Try reaching os via module attributes -- + + def test_json_os_not_reachable(self): + """json module should not expose os.""" + ns = self._make_namespace() + self.assertFalse(hasattr(ns.get("json"), "os")) + + def test_re_os_not_reachable(self): + """re module should not expose os.""" + ns = self._make_namespace() + self.assertFalse(hasattr(ns.get("re"), "os")) + + def test_asyncio_os_not_reachable(self): + """asyncio should not expose os.""" + ns = self._make_namespace() + self.assertFalse(hasattr(ns.get("asyncio"), "os")) + + # -- Try module __loader__ / __spec__ traversal -- + + def test_module_loader_traversal(self): + """Try to reach importlib via asyncio.__loader__ -- should not give RCE.""" + ns = self._make_namespace() + # Even if __loader__ exists, it shouldn't provide __import__ + asyncio_mod = ns.get("asyncio") + if hasattr(asyncio_mod, "__loader__"): + loader = asyncio_mod.__loader__ + # loader.load_module is deprecated but check it doesn't exist + # The key is: without __import__ in builtins, the hook code + # can't call loader methods that would import modules + self.assertNotIn('__import__', ns['__builtins__']) + + # -- Try getattr/setattr (should be removed) -- + + def test_getattr_not_available(self): + """getattr must not be in builtins (sandbox escape vector).""" + ns = self._make_namespace() + self.assertNotIn('getattr', ns['__builtins__']) + + def test_setattr_not_available(self): + """setattr must not be in builtins.""" + ns = self._make_namespace() + self.assertNotIn('setattr', ns['__builtins__']) + + # -- Try frame walking from within hook -- + + def test_frame_walk_from_hook(self): + """Frame walking inside exec'd code to escape sandbox.""" + code = ''' +import sys +''' + with self.assertRaises(ImportError): + self._exec_hook(code) + + # -- Try generator gi_frame trick (the original vuln) from hook -- + + def test_gi_frame_from_hook(self): + """The original gi_frame.f_back exploit should not give __import__.""" + # Even if frame walking works, builtins in this frame should not have __import__ + ns = self._make_namespace() + self.assertNotIn('__import__', ns['__builtins__']) + + +# ============================================================================ +# PART 7: End-to-end exploit payload test +# ============================================================================ + +class TestEndToEndExploit(unittest.TestCase): + """Test the EXACT exploit from the vulnerability report against _compute_field. + This is the ultimate test: the actual payload that achieved RCE.""" + + @classmethod + def setUpClass(cls): + from crawl4ai.extraction_strategy import JsonCssExtractionStrategy + schema = {"baseSelector": "div", "fields": [ + {"name": "x", "selector": "span", "type": "text"} + ]} + cls.strategy = JsonCssExtractionStrategy(schema) + + def test_exact_exploit_from_report(self): + """The exact payload from Song Binglin's vulnerability report.""" + EXPLOIT = ( + "(lambda r: [" + "dict.update(r, {" + "'g': (" + "dict.update(r, {'f': r['g'].gi_frame.f_back}) or 0 " + "for x in range(1)" + ")" + "})," + "list(r['g'])," + "(lambda w, f: w(w, f))(" + "lambda s, f: " + "f.f_builtins['__import__']('os').popen(r['cmd']).read() " + "if f and '__import__' in f.f_builtins " + "else s(s, f.f_back) if f else 'frame_walk_failed'," + "r['f']" + ")" + "][2])({'cmd': 'id'})" + ) + field = { + "name": "rce_output", + "type": "computed", + "expression": EXPLOIT, + "default": "BLOCKED", + } + result = self.strategy._compute_field({}, field) + self.assertEqual(result, "BLOCKED", + "The exact exploit payload from the vuln report must return default, never execute") + + def test_simplified_gi_frame_exploit(self): + """Simplified version targeting gi_frame directly.""" + field = { + "name": "test", + "type": "computed", + "expression": "(x for x in [1]).gi_frame.f_builtins['__import__']('os').popen('id').read()", + "default": "BLOCKED", + } + result = self.strategy._compute_field({}, field) + self.assertEqual(result, "BLOCKED") + + def test_lambda_globals_exploit(self): + """Lambda-based __globals__ access.""" + field = { + "name": "test", + "type": "computed", + "expression": "(lambda: 0).__code__.co_consts", + "default": "BLOCKED", + } + result = self.strategy._compute_field({}, field) + self.assertEqual(result, "BLOCKED") + + def test_type_mro_exploit(self): + """type().mro() based subclass scanning.""" + field = { + "name": "test", + "type": "computed", + "expression": "[c for c in ().__class__.__bases__[0].__subclasses__() if 'warning' in c.__name__][0]()._module.__builtins__['__import__']('os').popen('id').read()", + "default": "BLOCKED", + } + result = self.strategy._compute_field({}, field) + self.assertEqual(result, "BLOCKED") + + def test_exploit_via_json_schema(self): + """Simulate how the exploit arrives: embedded in extraction schema.""" + from crawl4ai.extraction_strategy import JsonCssExtractionStrategy + + malicious_schema = { + "name": "pwned", + "baseSelector": "body", + "fields": [ + { + "name": "rce_output", + "type": "computed", + "expression": "__import__('os').popen('id').read()", + "default": None, + } + ] + } + strategy = JsonCssExtractionStrategy(malicious_schema) + result = strategy._compute_field({}, malicious_schema["fields"][0]) + self.assertIsNone(result, "Malicious schema expression must return default (None)") + + if __name__ == "__main__": print("=" * 70) print("Crawl4AI Adversarial Security Tests")