#!/usr/bin/env python3 """ Adversarial security tests for the 4 vulns reported 2026-04-13. Self-contained: copies validation logic to avoid import issues with dns.resolver etc. Vuln 1: Arbitrary File Write via /screenshot + /pdf output_path Vuln 2: Monitor Auth Bypass (structural source check) Vuln 3: Stored XSS in Monitor Dashboard (server-side + client-side) Vuln 4: SSRF via Webhook URL """ import os import sys import unittest import ipaddress import socket from urllib.parse import urlparse # ============================================================================ # Local copies of security utilities (to avoid dns.resolver import in utils.py) # ============================================================================ ALLOWED_OUTPUT_DIR = os.environ.get("CRAWL4AI_OUTPUT_DIR", "/tmp/crawl4ai-outputs") def validate_output_path(user_path): safe_path = os.path.normpath(user_path).lstrip(os.sep) abs_path = os.path.abspath(os.path.join(ALLOWED_OUTPUT_DIR, safe_path)) abs_allowed = os.path.abspath(ALLOWED_OUTPUT_DIR) + os.sep if not abs_path.startswith(abs_allowed): raise ValueError(f"output_path must resolve within {ALLOWED_OUTPUT_DIR}") return abs_path _BLOCKED_NETWORKS = [ ipaddress.ip_network("0.0.0.0/8"), ipaddress.ip_network("10.0.0.0/8"), ipaddress.ip_network("100.64.0.0/10"), ipaddress.ip_network("127.0.0.0/8"), ipaddress.ip_network("169.254.0.0/16"), ipaddress.ip_network("172.16.0.0/12"), ipaddress.ip_network("192.0.0.0/24"), ipaddress.ip_network("192.168.0.0/16"), ipaddress.ip_network("198.18.0.0/15"), ipaddress.ip_network("::1/128"), ipaddress.ip_network("fc00::/7"), ipaddress.ip_network("fe80::/10"), ] _BLOCKED_HOSTNAMES = { "localhost", "metadata.google.internal", "metadata", "kubernetes.default", "kubernetes.default.svc", } def validate_webhook_url(url): parsed = urlparse(str(url)) hostname = parsed.hostname if not hostname: raise ValueError("Webhook URL must have a valid hostname") hostname_lower = hostname.lower() if hostname_lower in _BLOCKED_HOSTNAMES: raise ValueError(f"Webhook URL hostname '{hostname}' is blocked") if hostname_lower.startswith("host.docker.internal"): raise ValueError(f"Webhook URL hostname '{hostname}' is blocked") try: resolved = socket.getaddrinfo(hostname, None) except socket.gaierror: raise ValueError(f"Cannot resolve webhook hostname '{hostname}'") for _, _, _, _, sockaddr in resolved: ip = ipaddress.ip_address(sockaddr[0]) for network in _BLOCKED_NETWORKS: if ip in network: raise ValueError(f"Webhook URL resolves to blocked address: {ip}") DEPLOY_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") # ============================================================================ # VULN 1: Arbitrary File Write - Path Traversal # ============================================================================ class TestPathTraversalBlocked(unittest.TestCase): """Test validate_output_path blocks all traversal attempts.""" def test_absolute_path_gets_jailed(self): """Absolute paths get stripped and jailed inside allowed dir.""" result = validate_output_path("/app/server.py") self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR)) self.assertNotEqual(result, "/app/server.py") def test_absolute_etc_passwd_gets_jailed(self): result = validate_output_path("/etc/passwd") self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR)) self.assertNotEqual(result, "/etc/passwd") def test_relative_traversal_simple(self): with self.assertRaises(ValueError): validate_output_path("../../etc/passwd") def test_relative_traversal_deep(self): with self.assertRaises(ValueError): validate_output_path("foo/../../bar/../../../app/evil.py") def test_absolute_path_home_gets_jailed(self): result = validate_output_path("/home/appuser/.bashrc") self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR)) def test_absolute_path_tmp_outside_gets_jailed(self): result = validate_output_path("/tmp/other-dir/evil.py") self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR)) def test_simple_filename(self): result = validate_output_path("test.png") self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR)) self.assertTrue(result.endswith("test.png")) def test_subdirectory(self): result = validate_output_path("subdir/deep/test.png") self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR)) def test_filename_with_dots(self): result = validate_output_path("my.screenshot.2024.png") self.assertTrue(result.endswith("my.screenshot.2024.png")) class TestPydanticPathValidator(unittest.TestCase): """Verify schemas.py has traversal rejection on output_path.""" def test_schemas_has_validator(self): with open(os.path.join(DEPLOY_DIR, "schemas.py")) as f: source = f.read() self.assertIn("reject_traversal", source, "schemas.py must have reject_traversal validator on output_path") self.assertIn('".."', source, "Validator must check for '..' traversal") # ============================================================================ # VULN 2: Monitor Auth Bypass (structural check) # ============================================================================ class TestMonitorAuthStructural(unittest.TestCase): def test_monitor_router_has_auth(self): with open(os.path.join(DEPLOY_DIR, "server.py")) as f: source = f.read() # Find the line with monitor_router for line in source.splitlines(): if "monitor_router" in line and "include_router" in line: self.assertIn("dependencies=", line, "Monitor router must have dependencies=[Depends(token_dep)]") return self.fail("Could not find monitor_router include_router line") def test_websocket_has_token_check(self): with open(os.path.join(DEPLOY_DIR, "monitor_routes.py")) as f: source = f.read() self.assertIn("CRAWL4AI_API_TOKEN", source, "WebSocket endpoint must check CRAWL4AI_API_TOKEN") self.assertIn("websocket.close", source, "WebSocket must close connection on auth failure") # ============================================================================ # VULN 3: Stored XSS (server-side + client-side) # ============================================================================ class TestXSSPrevention(unittest.TestCase): def test_html_escape_blocks_script_tags(self): import html payload = '' escaped = html.escape(payload) self.assertNotIn("