fix(security): SSRF protection on all crawl/md/llm URL entry points

Reported by secsys_codex (2026-04-18): /md, /crawl, /llm endpoints
pass user URLs to crawler.arun() with no private IP validation.

- Add validate_url_destination() to utils.py with opt-out via
  CRAWL4AI_ALLOW_INTERNAL_URLS=true env var for users who need
  to crawl internal services.
- Integrate into validate_url_scheme() (covers all server.py endpoints).
- Add validation at all 4 URL entry points in api.py (handle_llm_qa,
  handle_markdown_request, create_new_task, handle_crawl_request).
- raw: URLs bypass check (inline HTML, no network fetch).
- 16 adversarial + source coverage tests added.
- secsys_codex added to SECURITY-CREDITS.md.

DO NOT PUSH until release day.
This commit is contained in:
unclecode
2026-04-20 09:42:43 +00:00
parent c9914691db
commit f77c0a856f
5 changed files with 213 additions and 3 deletions

View File

@@ -8,3 +8,4 @@ We thank the following security researchers for their responsible disclosure:
| Jeongbean Jeon | wjswjdqls7@gmail.com | File write, SSRF, monitor auth bypass, stored XSS | 2026-04-13 |
| wulonchia | wulonchia@gmail.com | File write via output_path (independent report) | 2026-04-13 |
| by111 (August829) | GitHub: [August829](https://github.com/August829) | Hardcoded JWT secret, eval in /config/dump, /execute_js, hook sandbox escape | 2026-04-14 |
| secsys_codex | secsys_codex@163.com | SSRF via /md, /crawl, /llm endpoints (URL destination validation) | 2026-04-18 |

View File

@@ -45,7 +45,8 @@ from utils import (
validate_llm_provider,
get_llm_temperature,
get_llm_base_url,
get_redis_task_ttl
get_redis_task_ttl,
validate_url_destination,
)
from webhook import WebhookDeliveryService
@@ -91,6 +92,7 @@ async def handle_llm_qa(
try:
if not url.startswith(('http://', 'https://')) and not url.startswith(("raw:", "raw://")):
url = 'https://' + url
validate_url_destination(url)
# Extract base URL by finding last '?q=' occurrence
last_q_index = url.rfind('?q=')
if last_q_index != -1:
@@ -284,6 +286,7 @@ async def handle_markdown_request(
decoded_url = unquote(url)
if not decoded_url.startswith(('http://', 'https://')) and not decoded_url.startswith(("raw:", "raw://")):
decoded_url = 'https://' + decoded_url
validate_url_destination(decoded_url)
if filter_type == FilterType.RAW:
md_generator = DefaultMarkdownGenerator()
@@ -442,6 +445,7 @@ async def create_new_task(
decoded_url = unquote(input_path)
if not decoded_url.startswith(('http://', 'https://')) and not decoded_url.startswith(("raw:", "raw://")):
decoded_url = 'https://' + decoded_url
validate_url_destination(decoded_url)
from datetime import datetime
task_id = f"llm_{int(datetime.now().timestamp())}_{id(background_tasks)}"
@@ -564,6 +568,8 @@ async def handle_crawl_request(
try:
urls = [('https://' + url) if not url.startswith(('http://', 'https://')) and not url.startswith(("raw:", "raw://")) else url for url in urls]
for url in urls:
validate_url_destination(url)
browser_config = BrowserConfig.load(browser_config)
crawler_config = CrawlerRunConfig.load(crawler_config)

View File

@@ -36,7 +36,7 @@ from schemas import (
from utils import (
FilterType, load_config, setup_logging, verify_email_domain,
validate_output_path, validate_webhook_url,
validate_output_path, validate_webhook_url, validate_url_destination,
)
import os
import sys
@@ -271,11 +271,12 @@ ALLOWED_URL_SCHEMES_WITH_RAW = ("http://", "https://", "raw:", "raw://")
def validate_url_scheme(url: str, allow_raw: bool = False) -> None:
"""Validate URL scheme to prevent file:// LFI attacks."""
"""Validate URL scheme (LFI) and destination (SSRF)."""
allowed = ALLOWED_URL_SCHEMES_WITH_RAW if allow_raw else ALLOWED_URL_SCHEMES
if not url.startswith(allowed):
schemes = ", ".join(allowed)
raise HTTPException(400, f"URL must start with {schemes}")
validate_url_destination(url)
# ───────────────── safe configdump helper ─────────────────

View File

@@ -0,0 +1,184 @@
#!/usr/bin/env python3
"""
Adversarial tests for SSRF protection on crawl/md/llm URL entry points.
Reported by secsys_codex (2026-04-18).
Tests that validate_url_destination() blocks internal IPs on all crawl paths,
and that CRAWL4AI_ALLOW_INTERNAL_URLS=true bypasses the check.
"""
import os
import sys
import unittest
import ipaddress
import socket
from urllib.parse import urlparse
DEPLOY_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
# Local copy of validation logic for self-contained testing
_BLOCKED_NETWORKS = [
ipaddress.ip_network("0.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("100.64.0.0/10"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.0.0.0/24"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("198.18.0.0/15"),
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
ipaddress.ip_network("fe80::/10"),
]
_BLOCKED_HOSTNAMES = {
"localhost", "metadata.google.internal", "metadata",
"kubernetes.default", "kubernetes.default.svc",
}
def _validate_webhook_url(url):
parsed = urlparse(str(url))
hostname = parsed.hostname
if not hostname:
raise ValueError("URL must have a valid hostname")
if hostname.lower() in _BLOCKED_HOSTNAMES:
raise ValueError(f"Hostname '{hostname}' is blocked")
if hostname.lower().startswith("host.docker.internal"):
raise ValueError(f"Hostname '{hostname}' is blocked")
try:
resolved = socket.getaddrinfo(hostname, None)
except socket.gaierror:
raise ValueError(f"Cannot resolve hostname '{hostname}'")
for _, _, _, _, sockaddr in resolved:
ip = ipaddress.ip_address(sockaddr[0])
for network in _BLOCKED_NETWORKS:
if ip in network:
raise ValueError(f"URL resolves to blocked address: {ip}")
def validate_url_destination(url, allow_internal=False):
"""Simulates the actual validate_url_destination from utils.py."""
if allow_internal:
return
if str(url).startswith(("raw:", "raw://")):
return
_validate_webhook_url(url)
# ============================================================================
# SSRF attacks on crawl URLs
# ============================================================================
class TestCrawlURLSSRF(unittest.TestCase):
"""Test SSRF protection on URLs that go to crawler.arun()."""
def test_localhost_blocked(self):
with self.assertRaises(ValueError):
validate_url_destination("http://127.0.0.1:8080/admin")
def test_localhost_name_blocked(self):
with self.assertRaises(ValueError):
validate_url_destination("http://localhost:8080/secret")
def test_10_network_blocked(self):
with self.assertRaises(ValueError):
validate_url_destination("http://10.0.0.1/internal")
def test_172_16_blocked(self):
with self.assertRaises(ValueError):
validate_url_destination("http://172.16.0.1/dashboard")
def test_192_168_blocked(self):
with self.assertRaises(ValueError):
validate_url_destination("http://192.168.1.1/router")
def test_aws_metadata(self):
with self.assertRaises(ValueError):
validate_url_destination("http://169.254.169.254/latest/meta-data/")
def test_gcp_metadata(self):
with self.assertRaises(ValueError):
validate_url_destination("http://metadata.google.internal/computeMetadata/v1/")
def test_docker_internal(self):
with self.assertRaises(ValueError):
validate_url_destination("http://host.docker.internal:3000/api")
def test_kubernetes(self):
with self.assertRaises(ValueError):
validate_url_destination("http://kubernetes.default/api/v1/secrets")
# -- Must allow: external URLs --
def test_external_url_allowed(self):
validate_url_destination("https://example.com")
validate_url_destination("https://www.google.com")
# -- raw: URLs bypass (no network fetch) --
def test_raw_url_bypasses(self):
validate_url_destination("raw:<html><body>hello</body></html>")
validate_url_destination("raw://<html>test</html>")
# -- ALLOW_INTERNAL_URLS opt-out --
def test_allow_internal_bypasses(self):
"""When opted in, internal URLs should pass."""
validate_url_destination("http://127.0.0.1:8080", allow_internal=True)
validate_url_destination("http://10.0.0.1/internal", allow_internal=True)
validate_url_destination("http://169.254.169.254/meta", allow_internal=True)
# ============================================================================
# Source-level verification
# ============================================================================
class TestSSRFSourceCoverage(unittest.TestCase):
"""Verify all URL entry points have SSRF validation."""
def test_server_validate_url_scheme_calls_destination(self):
"""validate_url_scheme must also call validate_url_destination."""
with open(os.path.join(DEPLOY_DIR, "server.py")) as f:
source = f.read()
# Find validate_url_scheme function body
idx = source.index("def validate_url_scheme")
func_end = source.index("\ndef ", idx + 1) if "\ndef " in source[idx+1:] else idx + 500
func_body = source[idx:func_end]
self.assertIn("validate_url_destination", func_body,
"validate_url_scheme must call validate_url_destination")
def test_api_py_has_destination_validation(self):
"""api.py must call validate_url_destination for all URL entry points."""
with open(os.path.join(DEPLOY_DIR, "api.py")) as f:
source = f.read()
self.assertIn("validate_url_destination", source,
"api.py must import and use validate_url_destination")
# Count occurrences -- should have at least 4 (one per entry point)
count = source.count("validate_url_destination")
self.assertGreaterEqual(count, 5, # 1 import + 4 call sites
f"api.py should call validate_url_destination at all URL entry points (found {count})")
def test_utils_has_allow_internal_flag(self):
"""utils.py must have CRAWL4AI_ALLOW_INTERNAL_URLS env var."""
with open(os.path.join(DEPLOY_DIR, "utils.py")) as f:
source = f.read()
self.assertIn("CRAWL4AI_ALLOW_INTERNAL_URLS", source)
self.assertIn("ALLOW_INTERNAL_URLS", source)
def test_validate_url_destination_skips_raw(self):
"""validate_url_destination must skip raw: URLs."""
with open(os.path.join(DEPLOY_DIR, "utils.py")) as f:
source = f.read()
idx = source.index("def validate_url_destination")
func_body = source[idx:idx+500]
self.assertIn("raw:", func_body,
"validate_url_destination must skip raw: URLs")
if __name__ == "__main__":
print("=" * 70)
print("Crawl4AI SSRF Tests - Crawl/MD/LLM Endpoints (secsys_codex)")
print("=" * 70)
print()
unittest.main(verbosity=2)

View File

@@ -337,6 +337,24 @@ _BLOCKED_HOSTNAMES = {
}
ALLOW_INTERNAL_URLS = os.environ.get("CRAWL4AI_ALLOW_INTERNAL_URLS", "false").lower() == "true"
def validate_url_destination(url: str) -> None:
"""Block crawl URLs targeting internal/private networks (SSRF protection).
Skipped when CRAWL4AI_ALLOW_INTERNAL_URLS=true.
Skipped for raw: URLs (inline HTML, no network fetch)."""
if ALLOW_INTERNAL_URLS:
return
if str(url).startswith(("raw:", "raw://")):
return
try:
validate_webhook_url(url)
except ValueError as e:
from fastapi import HTTPException
raise HTTPException(status_code=400, detail=f"URL blocked (SSRF protection): {e}")
def validate_webhook_url(url: str) -> None:
"""Reject webhook URLs targeting internal/private/reserved networks (SSRF protection)."""
parsed = urlparse(str(url))