#!/usr/bin/env python3
"""
Adversarial security tests for the 4 vulns reported 2026-04-13.
Self-contained: copies validation logic to avoid import issues with dns.resolver etc.
Vuln 1: Arbitrary File Write via /screenshot + /pdf output_path
Vuln 2: Monitor Auth Bypass (structural source check)
Vuln 3: Stored XSS in Monitor Dashboard (server-side + client-side)
Vuln 4: SSRF via Webhook URL
"""
import os
import sys
import unittest
import ipaddress
import socket
from urllib.parse import urlparse
# ============================================================================
# Local copies of security utilities (to avoid dns.resolver import in utils.py)
# ============================================================================
ALLOWED_OUTPUT_DIR = os.environ.get("CRAWL4AI_OUTPUT_DIR", "/tmp/crawl4ai-outputs")
def validate_output_path(user_path):
safe_path = os.path.normpath(user_path).lstrip(os.sep)
abs_path = os.path.abspath(os.path.join(ALLOWED_OUTPUT_DIR, safe_path))
abs_allowed = os.path.abspath(ALLOWED_OUTPUT_DIR) + os.sep
if not abs_path.startswith(abs_allowed):
raise ValueError(f"output_path must resolve within {ALLOWED_OUTPUT_DIR}")
return abs_path
_BLOCKED_NETWORKS = [
ipaddress.ip_network("0.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("100.64.0.0/10"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.0.0.0/24"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("198.18.0.0/15"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
ipaddress.ip_network("fe80::/10"),
]
_BLOCKED_HOSTNAMES = {
"localhost", "metadata.google.internal", "metadata",
"kubernetes.default", "kubernetes.default.svc",
}
def validate_webhook_url(url):
parsed = urlparse(str(url))
hostname = parsed.hostname
if not hostname:
raise ValueError("Webhook URL must have a valid hostname")
hostname_lower = hostname.lower()
if hostname_lower in _BLOCKED_HOSTNAMES:
raise ValueError(f"Webhook URL hostname '{hostname}' is blocked")
if hostname_lower.startswith("host.docker.internal"):
raise ValueError(f"Webhook URL hostname '{hostname}' is blocked")
try:
resolved = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise ValueError(f"Cannot resolve webhook hostname '{hostname}'")
for _, _, _, _, sockaddr in resolved:
ip = ipaddress.ip_address(sockaddr[0])
for network in _BLOCKED_NETWORKS:
if ip in network:
raise ValueError(f"Webhook URL resolves to blocked address: {ip}")
DEPLOY_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
# ============================================================================
# VULN 1: Arbitrary File Write - Path Traversal
# ============================================================================
class TestPathTraversalBlocked(unittest.TestCase):
"""Test validate_output_path blocks all traversal attempts."""
def test_absolute_path_gets_jailed(self):
"""Absolute paths get stripped and jailed inside allowed dir."""
result = validate_output_path("/app/server.py")
self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR))
self.assertNotEqual(result, "/app/server.py")
def test_absolute_etc_passwd_gets_jailed(self):
result = validate_output_path("/etc/passwd")
self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR))
self.assertNotEqual(result, "/etc/passwd")
def test_relative_traversal_simple(self):
with self.assertRaises(ValueError):
validate_output_path("../../etc/passwd")
def test_relative_traversal_deep(self):
with self.assertRaises(ValueError):
validate_output_path("foo/../../bar/../../../app/evil.py")
def test_absolute_path_home_gets_jailed(self):
result = validate_output_path("/home/appuser/.bashrc")
self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR))
def test_absolute_path_tmp_outside_gets_jailed(self):
result = validate_output_path("/tmp/other-dir/evil.py")
self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR))
def test_simple_filename(self):
result = validate_output_path("test.png")
self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR))
self.assertTrue(result.endswith("test.png"))
def test_subdirectory(self):
result = validate_output_path("subdir/deep/test.png")
self.assertTrue(result.startswith(ALLOWED_OUTPUT_DIR))
def test_filename_with_dots(self):
result = validate_output_path("my.screenshot.2024.png")
self.assertTrue(result.endswith("my.screenshot.2024.png"))
class TestPydanticPathValidator(unittest.TestCase):
"""Verify schemas.py has traversal rejection on output_path."""
def test_schemas_has_validator(self):
with open(os.path.join(DEPLOY_DIR, "schemas.py")) as f:
source = f.read()
self.assertIn("reject_traversal", source,
"schemas.py must have reject_traversal validator on output_path")
self.assertIn('".."', source,
"Validator must check for '..' traversal")
# ============================================================================
# VULN 2: Monitor Auth Bypass (structural check)
# ============================================================================
class TestMonitorAuthStructural(unittest.TestCase):
def test_monitor_router_has_auth(self):
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
# Find the line with monitor_router
for line in source.splitlines():
if "monitor_router" in line and "include_router" in line:
self.assertIn("dependencies=", line,
"Monitor router must have dependencies=[Depends(token_dep)]")
return
self.fail("Could not find monitor_router include_router line")
def test_websocket_has_token_check(self):
with open(os.path.join(DEPLOY_DIR, "monitor_routes.py")) as f:
source = f.read()
self.assertIn("CRAWL4AI_API_TOKEN", source,
"WebSocket endpoint must check CRAWL4AI_API_TOKEN")
self.assertIn("websocket.close", source,
"WebSocket must close connection on auth failure")
# ============================================================================
# VULN 3: Stored XSS (server-side + client-side)
# ============================================================================
class TestXSSPrevention(unittest.TestCase):
def test_html_escape_blocks_script_tags(self):
import html
payload = ''
escaped = html.escape(payload)
self.assertNotIn("