Files
crawl4ai/deploy/docker/webhook.py
unclecode aa81e8fe7d security: non-breaking hardening patch (0.8.8)
Backward-compatible fixes for the Docker server - features keep working, only
the unsafe behavior is closed. (The secure-by-default redesign is the later
major.)

- SSRF: replace the explicit blocklist with the one rule (reject any resolved
  IP where not ip.is_global) evaluated on embedded IPv4 transition forms too,
  closing the gaps - IPv6 unspecified ::, NAT64 64:ff9b::/96, 6to4 2002::/16,
  v4-mapped. Error messages are now opaque (no resolved-IP leak).
- output_path arbitrary write: harden validate_output_path with realpath
  containment (defeats a symlinked path component) and write via O_NOFOLLOW
  (write_output_file). output_path stays supported.
- LLM base_url key exfil: ignore a request-supplied base_url in /md, /llm,
  /llm/job; the endpoint is always server-derived. Field still accepted (no
  4xx) for compatibility.
- env:SECRET_KEY exfil gadget: LLMConfig refuses env: resolution of protected
  names (SECRET/PASSWORD/PRIVATE substrings, CRAWL4AI*/AWS_SECRET* prefixes,
  SECRET_KEY/REDIS_PASSWORD/TOKEN). Normal provider keys (OPENAI_API_KEY, ...)
  unaffected.
- CRLF log injection: CRLFSafeFilter strips CR/LF/control from log records.
- Webhook header injection: sanitize_webhook_headers (name pattern, no control
  chars, deny hop-by-hop/sensitive) at send time + a WebhookConfig validator
  for early 422.

Bump 0.8.7 -> 0.8.8 (__version__ + Dockerfile C4AI_VER). 30 new behavioral
tests; existing 111 security tests + 112 library config tests still pass.

NOT included (breaking -> deferred to the major): auth-by-default, trust
boundary, declarative hooks, output_path removal, base_url/provider removal,
loopback bind, redis password, TLS-verify-on, CORS, bounded queue. The
exec-hook RCE and unauth-by-default criticals have no non-breaking fix and are
closed only in the major (hooks are already off by default).
2026-06-02 12:39:04 +00:00

201 lines
7.5 KiB
Python

"""
Webhook delivery service for Crawl4AI.
This module provides webhook notification functionality with exponential backoff retry logic.
"""
import asyncio
import httpx
import logging
import re
from typing import Dict, Optional
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# Webhook request-header policy: user-controlled outbound headers could inject
# hop-by-hop / smuggling headers or CRLF. Allow only well-formed names, reject
# control chars in values, and deny sensitive/hop-by-hop names.
_WEBHOOK_HEADER_NAME = re.compile(r"^[A-Za-z0-9-]{1,64}$")
_WEBHOOK_DENY_HEADERS = {
"host", "content-length", "transfer-encoding", "connection",
"content-type", "proxy-authorization", "authorization", "cookie",
"expect", "upgrade", "te", "trailer",
}
_MAX_WEBHOOK_HEADERS = 20
_MAX_WEBHOOK_HEADER_VALUE = 2048
def sanitize_webhook_headers(headers: Optional[Dict[str, str]]) -> Dict[str, str]:
"""Validate user-supplied webhook headers; raise ValueError on any bad one."""
if not headers:
return {}
if len(headers) > _MAX_WEBHOOK_HEADERS:
raise ValueError("too many webhook headers")
clean: Dict[str, str] = {}
for name, value in headers.items():
if not isinstance(name, str) or not _WEBHOOK_HEADER_NAME.match(name):
raise ValueError(f"invalid webhook header name: {name!r}")
if name.lower() in _WEBHOOK_DENY_HEADERS:
raise ValueError(f"webhook header not allowed: {name}")
sval = str(value)
if len(sval) > _MAX_WEBHOOK_HEADER_VALUE or any(c in sval for c in "\r\n\x00"):
raise ValueError(f"invalid value for webhook header {name}")
clean[name] = sval
return clean
class WebhookDeliveryService:
"""Handles webhook delivery with exponential backoff retry logic."""
def __init__(self, config: Dict):
"""
Initialize the webhook delivery service.
Args:
config: Application configuration dictionary containing webhook settings
"""
self.config = config.get("webhooks", {})
self.max_attempts = self.config.get("retry", {}).get("max_attempts", 5)
self.initial_delay = self.config.get("retry", {}).get("initial_delay_ms", 1000) / 1000
self.max_delay = self.config.get("retry", {}).get("max_delay_ms", 32000) / 1000
self.timeout = self.config.get("retry", {}).get("timeout_ms", 30000) / 1000
async def send_webhook(
self,
webhook_url: str,
payload: Dict,
headers: Optional[Dict[str, str]] = None
) -> bool:
"""
Send webhook with exponential backoff retry logic.
Args:
webhook_url: The URL to send the webhook to
payload: The JSON payload to send
headers: Optional custom headers
Returns:
bool: True if delivered successfully, False otherwise
"""
# SECURITY: Validate webhook URL at send time (defense-in-depth against SSRF)
from utils import validate_webhook_url
validate_webhook_url(webhook_url)
default_headers = self.config.get("headers", {})
try:
safe_custom = sanitize_webhook_headers(headers)
except ValueError as e:
logger.warning(f"Dropping unsafe webhook headers: {e}")
safe_custom = {}
merged_headers = {**default_headers, **safe_custom}
merged_headers["Content-Type"] = "application/json"
async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=False) as client:
for attempt in range(self.max_attempts):
try:
logger.info(
f"Sending webhook (attempt {attempt + 1}/{self.max_attempts}) to {webhook_url}"
)
response = await client.post(
webhook_url,
json=payload,
headers=merged_headers
)
# Success or client error (don't retry client errors)
if response.status_code < 500:
if 200 <= response.status_code < 300:
logger.info(f"Webhook delivered successfully to {webhook_url}")
return True
else:
logger.warning(
f"Webhook rejected with status {response.status_code}: {response.text[:200]}"
)
return False # Client error - don't retry
# Server error - retry with backoff
logger.warning(
f"Webhook failed with status {response.status_code}, will retry"
)
except httpx.TimeoutException as exc:
logger.error(f"Webhook timeout (attempt {attempt + 1}): {exc}")
except httpx.RequestError as exc:
logger.error(f"Webhook request error (attempt {attempt + 1}): {exc}")
except Exception as exc:
logger.error(f"Webhook delivery error (attempt {attempt + 1}): {exc}")
# Calculate exponential backoff delay
if attempt < self.max_attempts - 1:
delay = min(self.initial_delay * (2 ** attempt), self.max_delay)
logger.info(f"Retrying in {delay}s...")
await asyncio.sleep(delay)
logger.error(
f"Webhook delivery failed after {self.max_attempts} attempts to {webhook_url}"
)
return False
async def notify_job_completion(
self,
task_id: str,
task_type: str,
status: str,
urls: list,
webhook_config: Optional[Dict],
result: Optional[Dict] = None,
error: Optional[str] = None
):
"""
Notify webhook of job completion.
Args:
task_id: The task identifier
task_type: Type of task (e.g., "crawl", "llm_extraction")
status: Task status ("completed" or "failed")
urls: List of URLs that were crawled
webhook_config: Webhook configuration from the job request
result: Optional crawl result data
error: Optional error message if failed
"""
# Determine webhook URL
webhook_url = None
data_in_payload = self.config.get("data_in_payload", False)
custom_headers = None
if webhook_config:
webhook_url = webhook_config.get("webhook_url")
data_in_payload = webhook_config.get("webhook_data_in_payload", data_in_payload)
custom_headers = webhook_config.get("webhook_headers")
if not webhook_url:
webhook_url = self.config.get("default_url")
if not webhook_url:
logger.debug("No webhook URL configured, skipping notification")
return
# Check if webhooks are enabled
if not self.config.get("enabled", True):
logger.debug("Webhooks are disabled, skipping notification")
return
# Build payload
payload = {
"task_id": task_id,
"task_type": task_type,
"status": status,
"timestamp": datetime.now(timezone.utc).isoformat(),
"urls": urls
}
if error:
payload["error"] = error
if data_in_payload and result:
payload["data"] = result
# Send webhook (fire and forget - don't block on completion)
await self.send_webhook(webhook_url, payload, custom_headers)