diff --git a/SECURITY-CREDITS.md b/SECURITY-CREDITS.md index 071f27b4..35cb889b 100644 --- a/SECURITY-CREDITS.md +++ b/SECURITY-CREDITS.md @@ -8,3 +8,4 @@ We thank the following security researchers for their responsible disclosure: | Jeongbean Jeon | wjswjdqls7@gmail.com | File write, SSRF, monitor auth bypass, stored XSS | 2026-04-13 | | wulonchia | wulonchia@gmail.com | File write via output_path (independent report) | 2026-04-13 | | by111 (August829) | GitHub: [August829](https://github.com/August829) | Hardcoded JWT secret, eval in /config/dump, /execute_js, hook sandbox escape | 2026-04-14 | +| secsys_codex | secsys_codex@163.com | SSRF via /md, /crawl, /llm endpoints (URL destination validation) | 2026-04-18 | diff --git a/deploy/docker/api.py b/deploy/docker/api.py index 1ecc9e0b..7ad256f8 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -45,7 +45,8 @@ from utils import ( validate_llm_provider, get_llm_temperature, get_llm_base_url, - get_redis_task_ttl + get_redis_task_ttl, + validate_url_destination, ) from webhook import WebhookDeliveryService @@ -91,6 +92,7 @@ async def handle_llm_qa( try: if not url.startswith(('http://', 'https://')) and not url.startswith(("raw:", "raw://")): url = 'https://' + url + validate_url_destination(url) # Extract base URL by finding last '?q=' occurrence last_q_index = url.rfind('?q=') if last_q_index != -1: @@ -284,6 +286,7 @@ async def handle_markdown_request( decoded_url = unquote(url) if not decoded_url.startswith(('http://', 'https://')) and not decoded_url.startswith(("raw:", "raw://")): decoded_url = 'https://' + decoded_url + validate_url_destination(decoded_url) if filter_type == FilterType.RAW: md_generator = DefaultMarkdownGenerator() @@ -442,6 +445,7 @@ async def create_new_task( decoded_url = unquote(input_path) if not decoded_url.startswith(('http://', 'https://')) and not decoded_url.startswith(("raw:", "raw://")): decoded_url = 'https://' + decoded_url + validate_url_destination(decoded_url) from datetime import datetime task_id = f"llm_{int(datetime.now().timestamp())}_{id(background_tasks)}" @@ -564,6 +568,8 @@ async def handle_crawl_request( try: urls = [('https://' + url) if not url.startswith(('http://', 'https://')) and not url.startswith(("raw:", "raw://")) else url for url in urls] + for url in urls: + validate_url_destination(url) browser_config = BrowserConfig.load(browser_config) crawler_config = CrawlerRunConfig.load(crawler_config) diff --git a/deploy/docker/server.py b/deploy/docker/server.py index a5bfe53e..57a62407 100644 --- a/deploy/docker/server.py +++ b/deploy/docker/server.py @@ -36,7 +36,7 @@ from schemas import ( from utils import ( FilterType, load_config, setup_logging, verify_email_domain, - validate_output_path, validate_webhook_url, + validate_output_path, validate_webhook_url, validate_url_destination, ) import os import sys @@ -271,11 +271,12 @@ ALLOWED_URL_SCHEMES_WITH_RAW = ("http://", "https://", "raw:", "raw://") def validate_url_scheme(url: str, allow_raw: bool = False) -> None: - """Validate URL scheme to prevent file:// LFI attacks.""" + """Validate URL scheme (LFI) and destination (SSRF).""" allowed = ALLOWED_URL_SCHEMES_WITH_RAW if allow_raw else ALLOWED_URL_SCHEMES if not url.startswith(allowed): schemes = ", ".join(allowed) raise HTTPException(400, f"URL must start with {schemes}") + validate_url_destination(url) # ───────────────── safe config‑dump helper ───────────────── diff --git a/deploy/docker/tests/test_security_ssrf_crawl.py b/deploy/docker/tests/test_security_ssrf_crawl.py new file mode 100644 index 00000000..7e13362b --- /dev/null +++ b/deploy/docker/tests/test_security_ssrf_crawl.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +""" +Adversarial tests for SSRF protection on crawl/md/llm URL entry points. +Reported by secsys_codex (2026-04-18). + +Tests that validate_url_destination() blocks internal IPs on all crawl paths, +and that CRAWL4AI_ALLOW_INTERNAL_URLS=true bypasses the check. +""" + +import os +import sys +import unittest +import ipaddress +import socket +from urllib.parse import urlparse + +DEPLOY_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") + +# Local copy of validation logic for self-contained testing +_BLOCKED_NETWORKS = [ + ipaddress.ip_network("0.0.0.0/8"), + ipaddress.ip_network("10.0.0.0/8"), + ipaddress.ip_network("100.64.0.0/10"), + ipaddress.ip_network("127.0.0.0/8"), + ipaddress.ip_network("169.254.0.0/16"), + ipaddress.ip_network("172.16.0.0/12"), + ipaddress.ip_network("192.0.0.0/24"), + ipaddress.ip_network("192.168.0.0/16"), + ipaddress.ip_network("198.18.0.0/15"), + ipaddress.ip_network("::1/128"), + ipaddress.ip_network("fc00::/7"), + ipaddress.ip_network("fe80::/10"), +] +_BLOCKED_HOSTNAMES = { + "localhost", "metadata.google.internal", "metadata", + "kubernetes.default", "kubernetes.default.svc", +} + + +def _validate_webhook_url(url): + parsed = urlparse(str(url)) + hostname = parsed.hostname + if not hostname: + raise ValueError("URL must have a valid hostname") + if hostname.lower() in _BLOCKED_HOSTNAMES: + raise ValueError(f"Hostname '{hostname}' is blocked") + if hostname.lower().startswith("host.docker.internal"): + raise ValueError(f"Hostname '{hostname}' is blocked") + try: + resolved = socket.getaddrinfo(hostname, None) + except socket.gaierror: + raise ValueError(f"Cannot resolve hostname '{hostname}'") + for _, _, _, _, sockaddr in resolved: + ip = ipaddress.ip_address(sockaddr[0]) + for network in _BLOCKED_NETWORKS: + if ip in network: + raise ValueError(f"URL resolves to blocked address: {ip}") + + +def validate_url_destination(url, allow_internal=False): + """Simulates the actual validate_url_destination from utils.py.""" + if allow_internal: + return + if str(url).startswith(("raw:", "raw://")): + return + _validate_webhook_url(url) + + +# ============================================================================ +# SSRF attacks on crawl URLs +# ============================================================================ + +class TestCrawlURLSSRF(unittest.TestCase): + """Test SSRF protection on URLs that go to crawler.arun().""" + + def test_localhost_blocked(self): + with self.assertRaises(ValueError): + validate_url_destination("http://127.0.0.1:8080/admin") + + def test_localhost_name_blocked(self): + with self.assertRaises(ValueError): + validate_url_destination("http://localhost:8080/secret") + + def test_10_network_blocked(self): + with self.assertRaises(ValueError): + validate_url_destination("http://10.0.0.1/internal") + + def test_172_16_blocked(self): + with self.assertRaises(ValueError): + validate_url_destination("http://172.16.0.1/dashboard") + + def test_192_168_blocked(self): + with self.assertRaises(ValueError): + validate_url_destination("http://192.168.1.1/router") + + def test_aws_metadata(self): + with self.assertRaises(ValueError): + validate_url_destination("http://169.254.169.254/latest/meta-data/") + + def test_gcp_metadata(self): + with self.assertRaises(ValueError): + validate_url_destination("http://metadata.google.internal/computeMetadata/v1/") + + def test_docker_internal(self): + with self.assertRaises(ValueError): + validate_url_destination("http://host.docker.internal:3000/api") + + def test_kubernetes(self): + with self.assertRaises(ValueError): + validate_url_destination("http://kubernetes.default/api/v1/secrets") + + # -- Must allow: external URLs -- + + def test_external_url_allowed(self): + validate_url_destination("https://example.com") + validate_url_destination("https://www.google.com") + + # -- raw: URLs bypass (no network fetch) -- + + def test_raw_url_bypasses(self): + validate_url_destination("raw:hello") + validate_url_destination("raw://test") + + # -- ALLOW_INTERNAL_URLS opt-out -- + + def test_allow_internal_bypasses(self): + """When opted in, internal URLs should pass.""" + validate_url_destination("http://127.0.0.1:8080", allow_internal=True) + validate_url_destination("http://10.0.0.1/internal", allow_internal=True) + validate_url_destination("http://169.254.169.254/meta", allow_internal=True) + + +# ============================================================================ +# Source-level verification +# ============================================================================ + +class TestSSRFSourceCoverage(unittest.TestCase): + """Verify all URL entry points have SSRF validation.""" + + def test_server_validate_url_scheme_calls_destination(self): + """validate_url_scheme must also call validate_url_destination.""" + with open(os.path.join(DEPLOY_DIR, "server.py")) as f: + source = f.read() + # Find validate_url_scheme function body + idx = source.index("def validate_url_scheme") + func_end = source.index("\ndef ", idx + 1) if "\ndef " in source[idx+1:] else idx + 500 + func_body = source[idx:func_end] + self.assertIn("validate_url_destination", func_body, + "validate_url_scheme must call validate_url_destination") + + def test_api_py_has_destination_validation(self): + """api.py must call validate_url_destination for all URL entry points.""" + with open(os.path.join(DEPLOY_DIR, "api.py")) as f: + source = f.read() + self.assertIn("validate_url_destination", source, + "api.py must import and use validate_url_destination") + # Count occurrences -- should have at least 4 (one per entry point) + count = source.count("validate_url_destination") + self.assertGreaterEqual(count, 5, # 1 import + 4 call sites + f"api.py should call validate_url_destination at all URL entry points (found {count})") + + def test_utils_has_allow_internal_flag(self): + """utils.py must have CRAWL4AI_ALLOW_INTERNAL_URLS env var.""" + with open(os.path.join(DEPLOY_DIR, "utils.py")) as f: + source = f.read() + self.assertIn("CRAWL4AI_ALLOW_INTERNAL_URLS", source) + self.assertIn("ALLOW_INTERNAL_URLS", source) + + def test_validate_url_destination_skips_raw(self): + """validate_url_destination must skip raw: URLs.""" + with open(os.path.join(DEPLOY_DIR, "utils.py")) as f: + source = f.read() + idx = source.index("def validate_url_destination") + func_body = source[idx:idx+500] + self.assertIn("raw:", func_body, + "validate_url_destination must skip raw: URLs") + + +if __name__ == "__main__": + print("=" * 70) + print("Crawl4AI SSRF Tests - Crawl/MD/LLM Endpoints (secsys_codex)") + print("=" * 70) + print() + unittest.main(verbosity=2) diff --git a/deploy/docker/utils.py b/deploy/docker/utils.py index e2ae562b..3a43ce31 100644 --- a/deploy/docker/utils.py +++ b/deploy/docker/utils.py @@ -337,6 +337,24 @@ _BLOCKED_HOSTNAMES = { } +ALLOW_INTERNAL_URLS = os.environ.get("CRAWL4AI_ALLOW_INTERNAL_URLS", "false").lower() == "true" + + +def validate_url_destination(url: str) -> None: + """Block crawl URLs targeting internal/private networks (SSRF protection). + Skipped when CRAWL4AI_ALLOW_INTERNAL_URLS=true. + Skipped for raw: URLs (inline HTML, no network fetch).""" + if ALLOW_INTERNAL_URLS: + return + if str(url).startswith(("raw:", "raw://")): + return + try: + validate_webhook_url(url) + except ValueError as e: + from fastapi import HTTPException + raise HTTPException(status_code=400, detail=f"URL blocked (SSRF protection): {e}") + + def validate_webhook_url(url: str) -> None: """Reject webhook URLs targeting internal/private/reserved networks (SSRF protection).""" parsed = urlparse(str(url))