mirror of
https://github.com/unclecode/crawl4ai.git
synced 2026-06-10 15:58:15 +00:00
fix(security): complete AST sandbox escape remediation (CVSS 9.8)
Addresses the gi_frame.f_back chain exploit reported by Song Binglin (q1uf3ng).
- Delete _safe_eval_expression() and _SAFE_EVAL_BUILTINS entirely from
extraction_strategy.py. Dead security-sensitive code is a liability.
The eval path was already disabled; this removes the function itself.
- Fix hook_manager.py module injection: replace broken exec("import X", ns)
pattern (silently failed due to missing __import__) with direct module
injection. Sanitize asyncio to strip subprocess access (RCE vector).
- Add startup warning when CRAWL4AI_API_TOKEN is unset (all endpoints
unauthenticated).
- Expand adversarial test suite to 87 tests: hook sandbox escapes,
asyncio.subprocess RCE verification, end-to-end exploit payload from
vuln report, dead code deletion checks, codebase eval/exec audit.
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
from abc import ABC, abstractmethod
|
||||
import ast
|
||||
import inspect
|
||||
from typing import Any, List, Dict, Optional, Tuple, Pattern, Union
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
@@ -1040,67 +1039,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
|
||||
# New extraction strategies for JSON-based extraction #
|
||||
#######################################################
|
||||
|
||||
# Safe builtins allowed in computed field expressions
|
||||
_SAFE_EVAL_BUILTINS = {
|
||||
"str": str, "int": int, "float": float, "bool": bool,
|
||||
"len": len, "round": round, "abs": abs, "min": min, "max": max,
|
||||
"sum": sum, "sorted": sorted, "reversed": reversed,
|
||||
"list": list, "dict": dict, "tuple": tuple, "set": set,
|
||||
"enumerate": enumerate, "zip": zip, "map": map, "filter": filter,
|
||||
"any": any, "all": all, "range": range,
|
||||
"True": True, "False": False, "None": None,
|
||||
"isinstance": isinstance, "type": type,
|
||||
}
|
||||
|
||||
|
||||
def _safe_eval_expression(expression: str, local_vars: dict) -> Any:
|
||||
"""
|
||||
Evaluate a computed field expression safely using AST validation.
|
||||
|
||||
Allows simple transforms (math, string methods, attribute access on data)
|
||||
while blocking dangerous operations (__import__, dunder access, etc.).
|
||||
|
||||
Args:
|
||||
expression: The Python expression string to evaluate.
|
||||
local_vars: The local variables (extracted item fields) available to the expression.
|
||||
|
||||
Returns:
|
||||
The result of evaluating the expression.
|
||||
|
||||
Raises:
|
||||
ValueError: If the expression contains disallowed constructs.
|
||||
"""
|
||||
try:
|
||||
tree = ast.parse(expression, mode="eval")
|
||||
except SyntaxError as e:
|
||||
raise ValueError(f"Invalid expression syntax: {e}")
|
||||
|
||||
for node in ast.walk(tree):
|
||||
# Block import statements
|
||||
if isinstance(node, (ast.Import, ast.ImportFrom)):
|
||||
raise ValueError("Import statements are not allowed in expressions")
|
||||
|
||||
# Block attribute access to dunder attributes (e.g., __class__, __globals__)
|
||||
if isinstance(node, ast.Attribute) and node.attr.startswith("_"):
|
||||
raise ValueError(
|
||||
f"Access to private/dunder attribute '{node.attr}' is not allowed"
|
||||
)
|
||||
|
||||
# Block calls to __import__ or any name starting with _
|
||||
if isinstance(node, ast.Call):
|
||||
func = node.func
|
||||
if isinstance(func, ast.Name) and func.id.startswith("_"):
|
||||
raise ValueError(
|
||||
f"Calling '{func.id}' is not allowed in expressions"
|
||||
)
|
||||
if isinstance(func, ast.Attribute) and func.attr.startswith("_"):
|
||||
raise ValueError(
|
||||
f"Calling '{func.attr}' is not allowed in expressions"
|
||||
)
|
||||
|
||||
safe_globals = {"__builtins__": _SAFE_EVAL_BUILTINS}
|
||||
return eval(compile(tree, "<expression>", "eval"), safe_globals, local_vars)
|
||||
|
||||
|
||||
class JsonElementExtractionStrategy(ExtractionStrategy):
|
||||
"""
|
||||
|
||||
@@ -142,12 +142,35 @@ class UserHookManager:
|
||||
'__builtins__': safe_builtins
|
||||
}
|
||||
|
||||
# Add commonly needed imports
|
||||
exec("import asyncio", namespace)
|
||||
exec("import json", namespace)
|
||||
exec("import re", namespace)
|
||||
exec("from typing import Dict, List, Optional", namespace)
|
||||
|
||||
# Inject commonly needed modules into hook namespace.
|
||||
# NOTE: We import here (in hook_manager's own scope) and inject
|
||||
# the objects directly. Using exec("import X", namespace) does NOT
|
||||
# work because namespace's __builtins__ lacks __import__.
|
||||
import asyncio as _asyncio_mod
|
||||
import json as _json_mod
|
||||
import re as _re_mod
|
||||
from typing import Dict as _Dict, List as _List, Optional as _Optional
|
||||
|
||||
# SECURITY: Sanitize asyncio to prevent RCE via
|
||||
# asyncio.subprocess.create_subprocess_shell() etc.
|
||||
# Hooks need asyncio for sleep/gather/etc but NOT subprocess.
|
||||
import types
|
||||
safe_asyncio = types.ModuleType("asyncio")
|
||||
for _attr in dir(_asyncio_mod):
|
||||
if _attr not in ("subprocess", "create_subprocess_exec",
|
||||
"create_subprocess_shell"):
|
||||
try:
|
||||
setattr(safe_asyncio, _attr, getattr(_asyncio_mod, _attr))
|
||||
except (AttributeError, TypeError):
|
||||
pass
|
||||
|
||||
namespace["asyncio"] = safe_asyncio
|
||||
namespace["json"] = _json_mod
|
||||
namespace["re"] = _re_mod
|
||||
namespace["Dict"] = _Dict
|
||||
namespace["List"] = _List
|
||||
namespace["Optional"] = _Optional
|
||||
|
||||
# Execute the code to define the function
|
||||
exec(hook_code, namespace)
|
||||
|
||||
|
||||
@@ -84,6 +84,15 @@ GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES)
|
||||
# Hooks are disabled by default for security (RCE risk). Set to "true" to enable.
|
||||
HOOKS_ENABLED = os.environ.get("CRAWL4AI_HOOKS_ENABLED", "false").lower() == "true"
|
||||
|
||||
# Warn loudly if API token is not set (all endpoints unauthenticated)
|
||||
_api_token = config.get("security", {}).get("api_token", "") or os.environ.get("CRAWL4AI_API_TOKEN", "")
|
||||
if not _api_token:
|
||||
import logging as _logging
|
||||
_logging.getLogger("crawl4ai.security").warning(
|
||||
"CRAWL4AI_API_TOKEN is not set. All API endpoints are unauthenticated. "
|
||||
"Set CRAWL4AI_API_TOKEN environment variable to enable authentication."
|
||||
)
|
||||
|
||||
# ── default browser config helper ─────────────────────────────
|
||||
def get_default_browser_config() -> BrowserConfig:
|
||||
"""Get default BrowserConfig from config.yml."""
|
||||
|
||||
@@ -376,48 +376,29 @@ class TestSafeEvalConfigAdversarial(unittest.TestCase):
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# PART 3: _safe_eval_expression function (still exists but unused from compute_field)
|
||||
# Verify it still has blocklist protections in case anyone calls it directly
|
||||
# PART 3: _safe_eval_expression DELETED
|
||||
# The function and _SAFE_EVAL_BUILTINS were removed from extraction_strategy.py.
|
||||
# Dead security-sensitive code is a liability.
|
||||
# ============================================================================
|
||||
|
||||
class TestSafeEvalExpressionFunction(unittest.TestCase):
|
||||
"""Test the _safe_eval_expression function directly.
|
||||
Even though _compute_field no longer calls it, the function still exists
|
||||
and should still block dangerous patterns if called."""
|
||||
class TestSafeEvalExpressionDeleted(unittest.TestCase):
|
||||
"""Verify _safe_eval_expression is gone from the codebase."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
from crawl4ai.extraction_strategy import _safe_eval_expression
|
||||
cls.safe_eval = staticmethod(_safe_eval_expression)
|
||||
def test_function_not_importable(self):
|
||||
"""_safe_eval_expression must not exist in extraction_strategy."""
|
||||
from crawl4ai import extraction_strategy
|
||||
self.assertFalse(
|
||||
hasattr(extraction_strategy, '_safe_eval_expression'),
|
||||
"_safe_eval_expression should be deleted - dead security code is a liability"
|
||||
)
|
||||
|
||||
# -- Must block --
|
||||
|
||||
def test_import_blocked(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.safe_eval("__import__('os').system('id')", {})
|
||||
|
||||
def test_dunder_attribute_blocked(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.safe_eval("''.__class__.__bases__", {})
|
||||
|
||||
def test_dunder_method_call_blocked(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.safe_eval("x.__getattribute__('y')", {"x": {}})
|
||||
|
||||
def test_private_attribute_blocked(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.safe_eval("x._private_method()", {"x": {}})
|
||||
|
||||
# -- Must work --
|
||||
|
||||
def test_simple_math(self):
|
||||
self.assertAlmostEqual(self.safe_eval("price * 1.1", {"price": 100}), 110.0)
|
||||
|
||||
def test_string_method(self):
|
||||
self.assertEqual(self.safe_eval("name.upper()", {"name": "hello"}), "HELLO")
|
||||
|
||||
def test_builtin_len(self):
|
||||
self.assertEqual(self.safe_eval("len(name)", {"name": "hello"}), 5)
|
||||
def test_safe_eval_builtins_not_importable(self):
|
||||
"""_SAFE_EVAL_BUILTINS must not exist in extraction_strategy."""
|
||||
from crawl4ai import extraction_strategy
|
||||
self.assertFalse(
|
||||
hasattr(extraction_strategy, '_SAFE_EVAL_BUILTINS'),
|
||||
"_SAFE_EVAL_BUILTINS should be deleted along with _safe_eval_expression"
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -467,7 +448,7 @@ class TestNoUnprotectedEval(unittest.TestCase):
|
||||
|
||||
def _scan_python_files(self, directory, exclude_dirs=None):
|
||||
"""Find all eval()/exec() calls in Python files."""
|
||||
exclude_dirs = exclude_dirs or {"__pycache__", ".git", "node_modules", "venv", ".venv"}
|
||||
exclude_dirs = exclude_dirs or {"__pycache__", ".git", "node_modules", "venv", ".venv", "build", "dist", ".eggs"}
|
||||
hits = []
|
||||
for root, dirs, files in os.walk(directory):
|
||||
dirs[:] = [d for d in dirs if d not in exclude_dirs]
|
||||
@@ -492,15 +473,13 @@ class TestNoUnprotectedEval(unittest.TestCase):
|
||||
"""Every eval/exec in the repo must be in the known-safe list."""
|
||||
repo_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
|
||||
|
||||
# Known, audited locations (file suffix, line range, call type)
|
||||
# Known, audited locations (file suffix, call type)
|
||||
known_safe = {
|
||||
# extraction_strategy.py _safe_eval_expression - blocklisted, no longer called from untrusted path
|
||||
("crawl4ai/extraction_strategy.py", "eval"),
|
||||
# server.py _safe_eval_config - hardened with allowlist
|
||||
("deploy/docker/server.py", "eval"),
|
||||
# hook_manager.py - restricted namespace, hooks gated behind env var
|
||||
("deploy/docker/hook_manager.py", "exec"),
|
||||
# test files are fine
|
||||
# NOTE: extraction_strategy.py eval was DELETED, not just disabled
|
||||
}
|
||||
|
||||
hits = self._scan_python_files(repo_root)
|
||||
@@ -526,6 +505,326 @@ class TestNoUnprotectedEval(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# PART 6: Hook manager sandbox escape tests
|
||||
# ============================================================================
|
||||
|
||||
class TestHookManagerSandboxEscapes(unittest.TestCase):
|
||||
"""Try every trick to escape the hook_manager exec() sandbox.
|
||||
Hooks are the most dangerous surface: exec() on user-supplied code."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
"""Build the hook sandbox exactly as hook_manager.py does."""
|
||||
import builtins
|
||||
import types
|
||||
|
||||
safe_builtins = {}
|
||||
allowed_builtins = [
|
||||
'print', 'len', 'str', 'int', 'float', 'bool',
|
||||
'list', 'dict', 'set', 'tuple', 'range', 'enumerate',
|
||||
'zip', 'map', 'filter', 'any', 'all', 'sum', 'min', 'max',
|
||||
'sorted', 'reversed', 'abs', 'round', 'isinstance', 'type',
|
||||
'hasattr', 'callable', 'iter', 'next',
|
||||
'__build_class__'
|
||||
]
|
||||
for name in allowed_builtins:
|
||||
if hasattr(builtins, name):
|
||||
safe_builtins[name] = getattr(builtins, name)
|
||||
|
||||
cls.safe_builtins = safe_builtins
|
||||
|
||||
def _make_namespace(self):
|
||||
"""Create a fresh hook namespace with sanitized imports (as hook_manager does).
|
||||
Mirrors the actual hook_manager.py injection approach: import in our scope,
|
||||
sanitize, then inject into namespace. exec("import X", ns) doesn't work
|
||||
because ns lacks __import__."""
|
||||
import asyncio as _asyncio_mod
|
||||
import json as _json_mod
|
||||
import re as _re_mod
|
||||
import types
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
namespace = {
|
||||
'__name__': 'test_hook',
|
||||
'__builtins__': dict(self.safe_builtins),
|
||||
}
|
||||
|
||||
# Sanitize asyncio: strip subprocess access
|
||||
safe_asyncio = types.ModuleType("asyncio")
|
||||
for attr in dir(_asyncio_mod):
|
||||
if attr not in ("subprocess", "create_subprocess_exec",
|
||||
"create_subprocess_shell"):
|
||||
try:
|
||||
setattr(safe_asyncio, attr, getattr(_asyncio_mod, attr))
|
||||
except (AttributeError, TypeError):
|
||||
pass
|
||||
|
||||
namespace["asyncio"] = safe_asyncio
|
||||
namespace["json"] = _json_mod
|
||||
namespace["re"] = _re_mod
|
||||
namespace["Dict"] = Dict
|
||||
namespace["List"] = List
|
||||
namespace["Optional"] = Optional
|
||||
|
||||
return namespace
|
||||
|
||||
def _exec_hook(self, code):
|
||||
"""Execute hook code in sandbox, return namespace."""
|
||||
ns = self._make_namespace()
|
||||
exec(code, ns)
|
||||
return ns
|
||||
|
||||
# -- The original RCE that was proven exploitable --
|
||||
|
||||
def test_asyncio_subprocess_blocked(self):
|
||||
"""asyncio.subprocess must not be accessible (was RCE vector)."""
|
||||
ns = self._make_namespace()
|
||||
self.assertFalse(
|
||||
hasattr(ns["asyncio"], "subprocess"),
|
||||
"asyncio.subprocess must be stripped from hook namespace"
|
||||
)
|
||||
|
||||
def test_asyncio_create_subprocess_shell_blocked(self):
|
||||
"""asyncio.create_subprocess_shell must not be accessible."""
|
||||
ns = self._make_namespace()
|
||||
self.assertFalse(
|
||||
hasattr(ns["asyncio"], "create_subprocess_shell"),
|
||||
"asyncio.create_subprocess_shell must be stripped"
|
||||
)
|
||||
|
||||
def test_asyncio_create_subprocess_exec_blocked(self):
|
||||
"""asyncio.create_subprocess_exec must not be accessible."""
|
||||
ns = self._make_namespace()
|
||||
self.assertFalse(
|
||||
hasattr(ns["asyncio"], "create_subprocess_exec"),
|
||||
"asyncio.create_subprocess_exec must be stripped"
|
||||
)
|
||||
|
||||
def test_asyncio_subprocess_rce_attempt(self):
|
||||
"""Actually try the RCE via asyncio.subprocess -- must fail."""
|
||||
code = '''
|
||||
async def evil(page, ctx):
|
||||
sp = asyncio.subprocess
|
||||
proc = await sp.create_subprocess_shell('id', stdout=sp.PIPE)
|
||||
out, _ = await proc.communicate()
|
||||
return out.decode()
|
||||
'''
|
||||
with self.assertRaises(AttributeError):
|
||||
ns = self._exec_hook(code)
|
||||
import asyncio
|
||||
asyncio.get_event_loop().run_until_complete(ns['evil'](None, None))
|
||||
|
||||
# -- asyncio useful functions still work --
|
||||
|
||||
def test_asyncio_sleep_still_works(self):
|
||||
"""asyncio.sleep must still be available for hooks."""
|
||||
ns = self._make_namespace()
|
||||
self.assertTrue(hasattr(ns["asyncio"], "sleep"))
|
||||
|
||||
def test_asyncio_gather_still_works(self):
|
||||
"""asyncio.gather must still be available."""
|
||||
ns = self._make_namespace()
|
||||
self.assertTrue(hasattr(ns["asyncio"], "gather"))
|
||||
|
||||
def test_asyncio_event_still_works(self):
|
||||
"""asyncio.Event must still be available."""
|
||||
ns = self._make_namespace()
|
||||
self.assertTrue(hasattr(ns["asyncio"], "Event"))
|
||||
|
||||
# -- Try importing os/subprocess directly --
|
||||
|
||||
def test_import_os_blocked(self):
|
||||
"""Direct 'import os' must fail (no __import__)."""
|
||||
with self.assertRaises(ImportError):
|
||||
self._exec_hook("import os")
|
||||
|
||||
def test_import_subprocess_blocked(self):
|
||||
with self.assertRaises(ImportError):
|
||||
self._exec_hook("import subprocess")
|
||||
|
||||
def test_import_sys_blocked(self):
|
||||
with self.assertRaises(ImportError):
|
||||
self._exec_hook("import sys")
|
||||
|
||||
# -- Try __import__ smuggling --
|
||||
|
||||
def test_dunder_import_not_available(self):
|
||||
"""__import__ must not be in builtins."""
|
||||
ns = self._make_namespace()
|
||||
self.assertNotIn('__import__', ns['__builtins__'])
|
||||
|
||||
def test_builtins_import_via_type(self):
|
||||
"""type().__bases__ subclass scanning can list classes but can't get __import__."""
|
||||
ns = self._exec_hook("""
|
||||
result = [c.__name__ for c in type.__bases__[0].__subclasses__()[:5]]
|
||||
""")
|
||||
# The subclass list is accessible, but without __import__ in builtins
|
||||
# there's no path to import os/subprocess for RCE
|
||||
self.assertNotIn('__import__', ns['__builtins__'])
|
||||
|
||||
# -- Try reaching os via module attributes --
|
||||
|
||||
def test_json_os_not_reachable(self):
|
||||
"""json module should not expose os."""
|
||||
ns = self._make_namespace()
|
||||
self.assertFalse(hasattr(ns.get("json"), "os"))
|
||||
|
||||
def test_re_os_not_reachable(self):
|
||||
"""re module should not expose os."""
|
||||
ns = self._make_namespace()
|
||||
self.assertFalse(hasattr(ns.get("re"), "os"))
|
||||
|
||||
def test_asyncio_os_not_reachable(self):
|
||||
"""asyncio should not expose os."""
|
||||
ns = self._make_namespace()
|
||||
self.assertFalse(hasattr(ns.get("asyncio"), "os"))
|
||||
|
||||
# -- Try module __loader__ / __spec__ traversal --
|
||||
|
||||
def test_module_loader_traversal(self):
|
||||
"""Try to reach importlib via asyncio.__loader__ -- should not give RCE."""
|
||||
ns = self._make_namespace()
|
||||
# Even if __loader__ exists, it shouldn't provide __import__
|
||||
asyncio_mod = ns.get("asyncio")
|
||||
if hasattr(asyncio_mod, "__loader__"):
|
||||
loader = asyncio_mod.__loader__
|
||||
# loader.load_module is deprecated but check it doesn't exist
|
||||
# The key is: without __import__ in builtins, the hook code
|
||||
# can't call loader methods that would import modules
|
||||
self.assertNotIn('__import__', ns['__builtins__'])
|
||||
|
||||
# -- Try getattr/setattr (should be removed) --
|
||||
|
||||
def test_getattr_not_available(self):
|
||||
"""getattr must not be in builtins (sandbox escape vector)."""
|
||||
ns = self._make_namespace()
|
||||
self.assertNotIn('getattr', ns['__builtins__'])
|
||||
|
||||
def test_setattr_not_available(self):
|
||||
"""setattr must not be in builtins."""
|
||||
ns = self._make_namespace()
|
||||
self.assertNotIn('setattr', ns['__builtins__'])
|
||||
|
||||
# -- Try frame walking from within hook --
|
||||
|
||||
def test_frame_walk_from_hook(self):
|
||||
"""Frame walking inside exec'd code to escape sandbox."""
|
||||
code = '''
|
||||
import sys
|
||||
'''
|
||||
with self.assertRaises(ImportError):
|
||||
self._exec_hook(code)
|
||||
|
||||
# -- Try generator gi_frame trick (the original vuln) from hook --
|
||||
|
||||
def test_gi_frame_from_hook(self):
|
||||
"""The original gi_frame.f_back exploit should not give __import__."""
|
||||
# Even if frame walking works, builtins in this frame should not have __import__
|
||||
ns = self._make_namespace()
|
||||
self.assertNotIn('__import__', ns['__builtins__'])
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# PART 7: End-to-end exploit payload test
|
||||
# ============================================================================
|
||||
|
||||
class TestEndToEndExploit(unittest.TestCase):
|
||||
"""Test the EXACT exploit from the vulnerability report against _compute_field.
|
||||
This is the ultimate test: the actual payload that achieved RCE."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
|
||||
schema = {"baseSelector": "div", "fields": [
|
||||
{"name": "x", "selector": "span", "type": "text"}
|
||||
]}
|
||||
cls.strategy = JsonCssExtractionStrategy(schema)
|
||||
|
||||
def test_exact_exploit_from_report(self):
|
||||
"""The exact payload from Song Binglin's vulnerability report."""
|
||||
EXPLOIT = (
|
||||
"(lambda r: ["
|
||||
"dict.update(r, {"
|
||||
"'g': ("
|
||||
"dict.update(r, {'f': r['g'].gi_frame.f_back}) or 0 "
|
||||
"for x in range(1)"
|
||||
")"
|
||||
"}),"
|
||||
"list(r['g']),"
|
||||
"(lambda w, f: w(w, f))("
|
||||
"lambda s, f: "
|
||||
"f.f_builtins['__import__']('os').popen(r['cmd']).read() "
|
||||
"if f and '__import__' in f.f_builtins "
|
||||
"else s(s, f.f_back) if f else 'frame_walk_failed',"
|
||||
"r['f']"
|
||||
")"
|
||||
"][2])({'cmd': 'id'})"
|
||||
)
|
||||
field = {
|
||||
"name": "rce_output",
|
||||
"type": "computed",
|
||||
"expression": EXPLOIT,
|
||||
"default": "BLOCKED",
|
||||
}
|
||||
result = self.strategy._compute_field({}, field)
|
||||
self.assertEqual(result, "BLOCKED",
|
||||
"The exact exploit payload from the vuln report must return default, never execute")
|
||||
|
||||
def test_simplified_gi_frame_exploit(self):
|
||||
"""Simplified version targeting gi_frame directly."""
|
||||
field = {
|
||||
"name": "test",
|
||||
"type": "computed",
|
||||
"expression": "(x for x in [1]).gi_frame.f_builtins['__import__']('os').popen('id').read()",
|
||||
"default": "BLOCKED",
|
||||
}
|
||||
result = self.strategy._compute_field({}, field)
|
||||
self.assertEqual(result, "BLOCKED")
|
||||
|
||||
def test_lambda_globals_exploit(self):
|
||||
"""Lambda-based __globals__ access."""
|
||||
field = {
|
||||
"name": "test",
|
||||
"type": "computed",
|
||||
"expression": "(lambda: 0).__code__.co_consts",
|
||||
"default": "BLOCKED",
|
||||
}
|
||||
result = self.strategy._compute_field({}, field)
|
||||
self.assertEqual(result, "BLOCKED")
|
||||
|
||||
def test_type_mro_exploit(self):
|
||||
"""type().mro() based subclass scanning."""
|
||||
field = {
|
||||
"name": "test",
|
||||
"type": "computed",
|
||||
"expression": "[c for c in ().__class__.__bases__[0].__subclasses__() if 'warning' in c.__name__][0]()._module.__builtins__['__import__']('os').popen('id').read()",
|
||||
"default": "BLOCKED",
|
||||
}
|
||||
result = self.strategy._compute_field({}, field)
|
||||
self.assertEqual(result, "BLOCKED")
|
||||
|
||||
def test_exploit_via_json_schema(self):
|
||||
"""Simulate how the exploit arrives: embedded in extraction schema."""
|
||||
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
|
||||
|
||||
malicious_schema = {
|
||||
"name": "pwned",
|
||||
"baseSelector": "body",
|
||||
"fields": [
|
||||
{
|
||||
"name": "rce_output",
|
||||
"type": "computed",
|
||||
"expression": "__import__('os').popen('id').read()",
|
||||
"default": None,
|
||||
}
|
||||
]
|
||||
}
|
||||
strategy = JsonCssExtractionStrategy(malicious_schema)
|
||||
result = strategy._compute_field({}, malicious_schema["fields"][0])
|
||||
self.assertIsNone(result, "Malicious schema expression must return default (None)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("=" * 70)
|
||||
print("Crawl4AI Adversarial Security Tests")
|
||||
|
||||
Reference in New Issue
Block a user