mirror of
https://github.com/unclecode/crawl4ai.git
synced 2026-06-10 15:58:15 +00:00
Backward-compatible fixes for the Docker server - features keep working, only the unsafe behavior is closed. (The secure-by-default redesign is the later major.) - SSRF: replace the explicit blocklist with the one rule (reject any resolved IP where not ip.is_global) evaluated on embedded IPv4 transition forms too, closing the gaps - IPv6 unspecified ::, NAT64 64:ff9b::/96, 6to4 2002::/16, v4-mapped. Error messages are now opaque (no resolved-IP leak). - output_path arbitrary write: harden validate_output_path with realpath containment (defeats a symlinked path component) and write via O_NOFOLLOW (write_output_file). output_path stays supported. - LLM base_url key exfil: ignore a request-supplied base_url in /md, /llm, /llm/job; the endpoint is always server-derived. Field still accepted (no 4xx) for compatibility. - env:SECRET_KEY exfil gadget: LLMConfig refuses env: resolution of protected names (SECRET/PASSWORD/PRIVATE substrings, CRAWL4AI*/AWS_SECRET* prefixes, SECRET_KEY/REDIS_PASSWORD/TOKEN). Normal provider keys (OPENAI_API_KEY, ...) unaffected. - CRLF log injection: CRLFSafeFilter strips CR/LF/control from log records. - Webhook header injection: sanitize_webhook_headers (name pattern, no control chars, deny hop-by-hop/sensitive) at send time + a WebhookConfig validator for early 422. Bump 0.8.7 -> 0.8.8 (__version__ + Dockerfile C4AI_VER). 30 new behavioral tests; existing 111 security tests + 112 library config tests still pass. NOT included (breaking -> deferred to the major): auth-by-default, trust boundary, declarative hooks, output_path removal, base_url/provider removal, loopback bind, redis password, TLS-verify-on, CORS, bounded queue. The exec-hook RCE and unauth-by-default criticals have no non-breaking fix and are closed only in the major (hooks are already off by default).
201 lines
7.5 KiB
Python
201 lines
7.5 KiB
Python
"""
|
|
Webhook delivery service for Crawl4AI.
|
|
|
|
This module provides webhook notification functionality with exponential backoff retry logic.
|
|
"""
|
|
import asyncio
|
|
import httpx
|
|
import logging
|
|
import re
|
|
from typing import Dict, Optional
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Webhook request-header policy: user-controlled outbound headers could inject
|
|
# hop-by-hop / smuggling headers or CRLF. Allow only well-formed names, reject
|
|
# control chars in values, and deny sensitive/hop-by-hop names.
|
|
_WEBHOOK_HEADER_NAME = re.compile(r"^[A-Za-z0-9-]{1,64}$")
|
|
_WEBHOOK_DENY_HEADERS = {
|
|
"host", "content-length", "transfer-encoding", "connection",
|
|
"content-type", "proxy-authorization", "authorization", "cookie",
|
|
"expect", "upgrade", "te", "trailer",
|
|
}
|
|
_MAX_WEBHOOK_HEADERS = 20
|
|
_MAX_WEBHOOK_HEADER_VALUE = 2048
|
|
|
|
|
|
def sanitize_webhook_headers(headers: Optional[Dict[str, str]]) -> Dict[str, str]:
|
|
"""Validate user-supplied webhook headers; raise ValueError on any bad one."""
|
|
if not headers:
|
|
return {}
|
|
if len(headers) > _MAX_WEBHOOK_HEADERS:
|
|
raise ValueError("too many webhook headers")
|
|
clean: Dict[str, str] = {}
|
|
for name, value in headers.items():
|
|
if not isinstance(name, str) or not _WEBHOOK_HEADER_NAME.match(name):
|
|
raise ValueError(f"invalid webhook header name: {name!r}")
|
|
if name.lower() in _WEBHOOK_DENY_HEADERS:
|
|
raise ValueError(f"webhook header not allowed: {name}")
|
|
sval = str(value)
|
|
if len(sval) > _MAX_WEBHOOK_HEADER_VALUE or any(c in sval for c in "\r\n\x00"):
|
|
raise ValueError(f"invalid value for webhook header {name}")
|
|
clean[name] = sval
|
|
return clean
|
|
|
|
|
|
class WebhookDeliveryService:
|
|
"""Handles webhook delivery with exponential backoff retry logic."""
|
|
|
|
def __init__(self, config: Dict):
|
|
"""
|
|
Initialize the webhook delivery service.
|
|
|
|
Args:
|
|
config: Application configuration dictionary containing webhook settings
|
|
"""
|
|
self.config = config.get("webhooks", {})
|
|
self.max_attempts = self.config.get("retry", {}).get("max_attempts", 5)
|
|
self.initial_delay = self.config.get("retry", {}).get("initial_delay_ms", 1000) / 1000
|
|
self.max_delay = self.config.get("retry", {}).get("max_delay_ms", 32000) / 1000
|
|
self.timeout = self.config.get("retry", {}).get("timeout_ms", 30000) / 1000
|
|
|
|
async def send_webhook(
|
|
self,
|
|
webhook_url: str,
|
|
payload: Dict,
|
|
headers: Optional[Dict[str, str]] = None
|
|
) -> bool:
|
|
"""
|
|
Send webhook with exponential backoff retry logic.
|
|
|
|
Args:
|
|
webhook_url: The URL to send the webhook to
|
|
payload: The JSON payload to send
|
|
headers: Optional custom headers
|
|
|
|
Returns:
|
|
bool: True if delivered successfully, False otherwise
|
|
"""
|
|
# SECURITY: Validate webhook URL at send time (defense-in-depth against SSRF)
|
|
from utils import validate_webhook_url
|
|
validate_webhook_url(webhook_url)
|
|
|
|
default_headers = self.config.get("headers", {})
|
|
try:
|
|
safe_custom = sanitize_webhook_headers(headers)
|
|
except ValueError as e:
|
|
logger.warning(f"Dropping unsafe webhook headers: {e}")
|
|
safe_custom = {}
|
|
merged_headers = {**default_headers, **safe_custom}
|
|
merged_headers["Content-Type"] = "application/json"
|
|
|
|
async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=False) as client:
|
|
for attempt in range(self.max_attempts):
|
|
try:
|
|
logger.info(
|
|
f"Sending webhook (attempt {attempt + 1}/{self.max_attempts}) to {webhook_url}"
|
|
)
|
|
|
|
response = await client.post(
|
|
webhook_url,
|
|
json=payload,
|
|
headers=merged_headers
|
|
)
|
|
|
|
# Success or client error (don't retry client errors)
|
|
if response.status_code < 500:
|
|
if 200 <= response.status_code < 300:
|
|
logger.info(f"Webhook delivered successfully to {webhook_url}")
|
|
return True
|
|
else:
|
|
logger.warning(
|
|
f"Webhook rejected with status {response.status_code}: {response.text[:200]}"
|
|
)
|
|
return False # Client error - don't retry
|
|
|
|
# Server error - retry with backoff
|
|
logger.warning(
|
|
f"Webhook failed with status {response.status_code}, will retry"
|
|
)
|
|
|
|
except httpx.TimeoutException as exc:
|
|
logger.error(f"Webhook timeout (attempt {attempt + 1}): {exc}")
|
|
except httpx.RequestError as exc:
|
|
logger.error(f"Webhook request error (attempt {attempt + 1}): {exc}")
|
|
except Exception as exc:
|
|
logger.error(f"Webhook delivery error (attempt {attempt + 1}): {exc}")
|
|
|
|
# Calculate exponential backoff delay
|
|
if attempt < self.max_attempts - 1:
|
|
delay = min(self.initial_delay * (2 ** attempt), self.max_delay)
|
|
logger.info(f"Retrying in {delay}s...")
|
|
await asyncio.sleep(delay)
|
|
|
|
logger.error(
|
|
f"Webhook delivery failed after {self.max_attempts} attempts to {webhook_url}"
|
|
)
|
|
return False
|
|
|
|
async def notify_job_completion(
|
|
self,
|
|
task_id: str,
|
|
task_type: str,
|
|
status: str,
|
|
urls: list,
|
|
webhook_config: Optional[Dict],
|
|
result: Optional[Dict] = None,
|
|
error: Optional[str] = None
|
|
):
|
|
"""
|
|
Notify webhook of job completion.
|
|
|
|
Args:
|
|
task_id: The task identifier
|
|
task_type: Type of task (e.g., "crawl", "llm_extraction")
|
|
status: Task status ("completed" or "failed")
|
|
urls: List of URLs that were crawled
|
|
webhook_config: Webhook configuration from the job request
|
|
result: Optional crawl result data
|
|
error: Optional error message if failed
|
|
"""
|
|
# Determine webhook URL
|
|
webhook_url = None
|
|
data_in_payload = self.config.get("data_in_payload", False)
|
|
custom_headers = None
|
|
|
|
if webhook_config:
|
|
webhook_url = webhook_config.get("webhook_url")
|
|
data_in_payload = webhook_config.get("webhook_data_in_payload", data_in_payload)
|
|
custom_headers = webhook_config.get("webhook_headers")
|
|
|
|
if not webhook_url:
|
|
webhook_url = self.config.get("default_url")
|
|
|
|
if not webhook_url:
|
|
logger.debug("No webhook URL configured, skipping notification")
|
|
return
|
|
|
|
# Check if webhooks are enabled
|
|
if not self.config.get("enabled", True):
|
|
logger.debug("Webhooks are disabled, skipping notification")
|
|
return
|
|
|
|
# Build payload
|
|
payload = {
|
|
"task_id": task_id,
|
|
"task_type": task_type,
|
|
"status": status,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"urls": urls
|
|
}
|
|
|
|
if error:
|
|
payload["error"] = error
|
|
|
|
if data_in_payload and result:
|
|
payload["data"] = result
|
|
|
|
# Send webhook (fire and forget - don't block on completion)
|
|
await self.send_webhook(webhook_url, payload, custom_headers)
|