Files
crawl4ai/deploy/docker/server.py
unclecode aa81e8fe7d security: non-breaking hardening patch (0.8.8)
Backward-compatible fixes for the Docker server - features keep working, only
the unsafe behavior is closed. (The secure-by-default redesign is the later
major.)

- SSRF: replace the explicit blocklist with the one rule (reject any resolved
  IP where not ip.is_global) evaluated on embedded IPv4 transition forms too,
  closing the gaps - IPv6 unspecified ::, NAT64 64:ff9b::/96, 6to4 2002::/16,
  v4-mapped. Error messages are now opaque (no resolved-IP leak).
- output_path arbitrary write: harden validate_output_path with realpath
  containment (defeats a symlinked path component) and write via O_NOFOLLOW
  (write_output_file). output_path stays supported.
- LLM base_url key exfil: ignore a request-supplied base_url in /md, /llm,
  /llm/job; the endpoint is always server-derived. Field still accepted (no
  4xx) for compatibility.
- env:SECRET_KEY exfil gadget: LLMConfig refuses env: resolution of protected
  names (SECRET/PASSWORD/PRIVATE substrings, CRAWL4AI*/AWS_SECRET* prefixes,
  SECRET_KEY/REDIS_PASSWORD/TOKEN). Normal provider keys (OPENAI_API_KEY, ...)
  unaffected.
- CRLF log injection: CRLFSafeFilter strips CR/LF/control from log records.
- Webhook header injection: sanitize_webhook_headers (name pattern, no control
  chars, deny hop-by-hop/sensitive) at send time + a WebhookConfig validator
  for early 422.

Bump 0.8.7 -> 0.8.8 (__version__ + Dockerfile C4AI_VER). 30 new behavioral
tests; existing 111 security tests + 112 library config tests still pass.

NOT included (breaking -> deferred to the major): auth-by-default, trust
boundary, declarative hooks, output_path removal, base_url/provider removal,
loopback bind, redis password, TLS-verify-on, CORS, bounded queue. The
exec-hook RCE and unauth-by-default criticals have no non-breaking fix and are
closed only in the major (hooks are already off by default).
2026-06-02 12:39:04 +00:00

890 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ───────────────────────── server.py ─────────────────────────
"""
Crawl4AI FastAPI entrypoint
• Browser pool + global page cap
• Ratelimiting, security, metrics
• /crawl, /crawl/stream, /md, /llm endpoints
"""
# ── stdlib & 3rdparty imports ───────────────────────────────
from crawler_pool import get_crawler, release_crawler, close_all, janitor
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from crawl4ai.__version__ import __version__
from auth import create_access_token, get_token_dependency, TokenRequest
from pydantic import BaseModel
from typing import Optional, List, Dict
from fastapi import Request, Depends
from fastapi.responses import FileResponse
import base64
import re
import logging
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from api import (
handle_markdown_request, handle_llm_qa,
handle_stream_crawl_request, handle_crawl_request,
stream_results
)
from schemas import (
CrawlRequestWithHooks,
MarkdownRequest,
RawCode,
HTMLRequest,
ScreenshotRequest,
PDFRequest,
JSEndpointRequest,
)
from utils import (
FilterType, load_config, setup_logging, verify_email_domain,
validate_output_path, write_output_file, validate_webhook_url, validate_url_destination,
)
import os
import sys
import time
import asyncio
from typing import List
from contextlib import asynccontextmanager
import pathlib
from fastapi import (
FastAPI, HTTPException, Request, Path, Query, Depends
)
from rank_bm25 import BM25Okapi
from fastapi.responses import (
StreamingResponse, RedirectResponse, PlainTextResponse, JSONResponse
)
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.staticfiles import StaticFiles
from job import init_job_router
from mcp_bridge import attach_mcp, mcp_resource, mcp_template, mcp_tool
import ast
import crawl4ai as _c4
from pydantic import BaseModel, Field
from slowapi import Limiter
from slowapi.util import get_remote_address
from prometheus_fastapi_instrumentator import Instrumentator
from redis import asyncio as aioredis
# ── internal imports (after sys.path append) ─────────────────
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
# ────────────────── configuration / logging ──────────────────
config = load_config()
setup_logging(config)
# Version is imported from crawl4ai package to ensure it stays in sync
# ── global page semaphore (hard cap) ─────────────────────────
MAX_PAGES = config["crawler"]["pool"].get("max_pages", 30)
GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES)
# ── security feature flags ───────────────────────────────────
# Hooks are disabled by default for security (RCE risk). Set to "true" to enable.
HOOKS_ENABLED = os.environ.get("CRAWL4AI_HOOKS_ENABLED", "false").lower() == "true"
# /execute_js disabled by default (arbitrary JS + SSRF risk). Set to "true" to enable.
EXECUTE_JS_ENABLED = os.environ.get("CRAWL4AI_EXECUTE_JS_ENABLED", "false").lower() == "true"
# Warn loudly if API token is not set (all endpoints unauthenticated)
_api_token = config.get("security", {}).get("api_token", "") or os.environ.get("CRAWL4AI_API_TOKEN", "")
if not _api_token:
import logging as _logging
_logging.getLogger("crawl4ai.security").warning(
"CRAWL4AI_API_TOKEN is not set. All API endpoints are unauthenticated. "
"Set CRAWL4AI_API_TOKEN environment variable to enable authentication."
)
# ── default browser config helper ─────────────────────────────
def get_default_browser_config() -> BrowserConfig:
"""Get default BrowserConfig from config.yml."""
return BrowserConfig(
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
)
# import logging
# page_log = logging.getLogger("page_cap")
# orig_arun = AsyncWebCrawler.arun
# async def capped_arun(self, *a, **kw):
# await GLOBAL_SEM.acquire() # ← take slot
# try:
# in_flight = MAX_PAGES - GLOBAL_SEM._value # used permits
# page_log.info("🕸️ pages_in_flight=%s / %s", in_flight, MAX_PAGES)
# return await orig_arun(self, *a, **kw)
# finally:
# GLOBAL_SEM.release() # ← free slot
orig_arun = AsyncWebCrawler.arun
async def capped_arun(self, *a, **kw):
async with GLOBAL_SEM:
return await orig_arun(self, *a, **kw)
AsyncWebCrawler.arun = capped_arun
# ───────────────────── FastAPI lifespan ──────────────────────
@asynccontextmanager
async def lifespan(_: FastAPI):
from crawler_pool import init_permanent
from monitor import MonitorStats
import monitor as monitor_module
# Initialize monitor
monitor_module.monitor_stats = MonitorStats(redis)
await monitor_module.monitor_stats.load_from_redis()
monitor_module.monitor_stats.start_persistence_worker()
# Initialize browser pool
await init_permanent(BrowserConfig(
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
))
# Start background tasks
app.state.janitor = asyncio.create_task(janitor())
app.state.timeline_updater = asyncio.create_task(_timeline_updater())
yield
# Cleanup
app.state.janitor.cancel()
app.state.timeline_updater.cancel()
# Monitor cleanup (persist stats and stop workers)
from monitor import get_monitor
try:
await get_monitor().cleanup()
except Exception as e:
logger.error(f"Monitor cleanup failed: {e}")
await close_all()
async def _timeline_updater():
"""Update timeline data every 5 seconds."""
from monitor import get_monitor
while True:
await asyncio.sleep(5)
try:
await asyncio.wait_for(get_monitor().update_timeline(), timeout=4.0)
except asyncio.TimeoutError:
logger.warning("Timeline update timeout after 4s")
except Exception as e:
logger.warning(f"Timeline update error: {e}")
# ───────────────────── FastAPI instance ──────────────────────
app = FastAPI(
title=config["app"]["title"],
version=config["app"]["version"],
lifespan=lifespan,
)
# ── static playground ──────────────────────────────────────
STATIC_DIR = pathlib.Path(__file__).parent / "static" / "playground"
if not STATIC_DIR.exists():
raise RuntimeError(f"Playground assets not found at {STATIC_DIR}")
app.mount(
"/playground",
StaticFiles(directory=STATIC_DIR, html=True),
name="play",
)
# ── static monitor dashboard ────────────────────────────────
MONITOR_DIR = pathlib.Path(__file__).parent / "static" / "monitor"
if not MONITOR_DIR.exists():
raise RuntimeError(f"Monitor assets not found at {MONITOR_DIR}")
app.mount(
"/dashboard",
StaticFiles(directory=MONITOR_DIR, html=True),
name="monitor_ui",
)
# ── static assets (logo, etc) ────────────────────────────────
ASSETS_DIR = pathlib.Path(__file__).parent / "static" / "assets"
if ASSETS_DIR.exists():
app.mount(
"/static/assets",
StaticFiles(directory=ASSETS_DIR),
name="assets",
)
@app.get("/")
async def root():
return RedirectResponse("/playground")
# ─────────────────── infra / middleware ─────────────────────
def _build_redis_url(config: dict) -> str:
"""Build Redis URL from config fields and environment variables."""
rc = config.get("redis", {})
host = os.environ.get("REDIS_HOST", rc.get("host", "localhost"))
port = os.environ.get("REDIS_PORT", rc.get("port", 6379))
password = os.environ.get("REDIS_PASSWORD", rc.get("password", ""))
db = rc.get("db", 0)
scheme = "rediss" if rc.get("ssl", False) else "redis"
auth = f":{password}@" if password else ""
return f"{scheme}://{auth}{host}:{port}/{db}"
redis = aioredis.from_url(_build_redis_url(config))
limiter = Limiter(
key_func=get_remote_address,
default_limits=[config["rate_limiting"]["default_limit"]],
storage_uri=config["rate_limiting"]["storage_uri"],
)
def _setup_security(app_: FastAPI):
sec = config["security"]
if not sec["enabled"]:
return
if sec.get("https_redirect"):
app_.add_middleware(HTTPSRedirectMiddleware)
if sec.get("trusted_hosts", []) != ["*"]:
app_.add_middleware(
TrustedHostMiddleware, allowed_hosts=sec["trusted_hosts"]
)
_setup_security(app)
if config["observability"]["prometheus"]["enabled"]:
Instrumentator().instrument(app).expose(app)
token_dep = get_token_dependency(config)
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
resp = await call_next(request)
if config["security"]["enabled"]:
resp.headers.update(config["security"]["headers"])
return resp
# ───────────────── URL validation helper ─────────────────
ALLOWED_URL_SCHEMES = ("http://", "https://")
ALLOWED_URL_SCHEMES_WITH_RAW = ("http://", "https://", "raw:", "raw://")
def validate_url_scheme(url: str, allow_raw: bool = False) -> None:
"""Validate URL scheme (LFI) and destination (SSRF)."""
allowed = ALLOWED_URL_SCHEMES_WITH_RAW if allow_raw else ALLOWED_URL_SCHEMES
if not url.startswith(allowed):
schemes = ", ".join(allowed)
raise HTTPException(400, f"URL must start with {schemes}")
validate_url_destination(url)
# ───────────────── safe configdump helper ─────────────────
ALLOWED_TYPES = {
"CrawlerRunConfig": CrawlerRunConfig,
"BrowserConfig": BrowserConfig,
}
def _config_from_json(data: dict) -> dict:
"""Create CrawlerRunConfig or BrowserConfig from JSON {type, params}.
No eval() -- uses Pydantic constructors directly."""
config_type = data.get("type")
params = data.get("params", {})
if config_type == "CrawlerRunConfig":
obj = CrawlerRunConfig(**params)
elif config_type == "BrowserConfig":
obj = BrowserConfig(**params)
else:
raise ValueError("type must be 'CrawlerRunConfig' or 'BrowserConfig'")
return obj.dump()
# ── job router ──────────────────────────────────────────────
app.include_router(init_job_router(redis, config, token_dep))
# ── monitor router ──────────────────────────────────────────
from monitor_routes import router as monitor_router
app.include_router(monitor_router, dependencies=[Depends(token_dep)])
logger = logging.getLogger(__name__)
# ──────────────────────── Endpoints ──────────────────────────
@app.post("/token")
async def get_token(req: TokenRequest):
expected_token = config.get("security", {}).get("api_token", "")
if expected_token and req.api_token != expected_token:
raise HTTPException(401, "Invalid or missing api_token")
if not verify_email_domain(req.email):
raise HTTPException(400, "Invalid email domain")
token = create_access_token({"sub": req.email})
return {"email": req.email, "access_token": token, "token_type": "bearer"}
@app.post("/config/dump")
async def config_dump(
data: dict,
_td: Dict = Depends(token_dep),
):
try:
return JSONResponse(_config_from_json(data))
except (TypeError, ValueError) as e:
raise HTTPException(400, str(e))
@app.post("/md")
@limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("md")
async def get_markdown(
request: Request,
body: MarkdownRequest,
_td: Dict = Depends(token_dep),
):
"""
Convert a web page into Markdown format.
Supports multiple extraction modes:
- fit (default): Readability-based extraction for clean content
- raw: Direct DOM to Markdown conversion
- bm25: BM25 relevance ranking with optional query
- llm: LLM-based summarization with optional query
Use this tool when you need clean, readable text from web pages.
"""
if not body.url.startswith(("http://", "https://")) and not body.url.startswith(("raw:", "raw://")):
raise HTTPException(
400, "Invalid URL format. Must start with http://, https://, or for raw HTML (raw:, raw://)")
markdown = await handle_markdown_request(
body.url, body.f, body.q, body.c, config, body.provider,
body.temperature, body.base_url
)
return JSONResponse({
"url": body.url,
"filter": body.f,
"query": body.q,
"cache": body.c,
"markdown": markdown,
"success": True
})
@app.post("/html")
@limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("html")
async def generate_html(
request: Request,
body: HTMLRequest,
_td: Dict = Depends(token_dep),
):
"""
Crawls the URL, preprocesses the raw HTML for schema extraction, and returns the processed HTML.
Use when you need sanitized HTML structures for building schemas or further processing.
"""
validate_url_scheme(body.url, allow_raw=True)
cfg = CrawlerRunConfig()
crawler = None
try:
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(500, detail=results[0].error_message or "Crawl failed")
raw_html = results[0].html
from crawl4ai.utils import preprocess_html_for_schema
processed_html = preprocess_html_for_schema(raw_html)
return JSONResponse({"html": processed_html, "url": body.url, "success": True})
except Exception as e:
raise HTTPException(500, detail=str(e))
finally:
if crawler:
await release_crawler(crawler)
# Screenshot endpoint
@app.post("/screenshot")
@limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("screenshot")
async def generate_screenshot(
request: Request,
body: ScreenshotRequest,
_td: Dict = Depends(token_dep),
):
"""
Capture a full-page PNG screenshot of the specified URL, waiting an optional delay before capture,
Use when you need an image snapshot of the rendered page. Its recommened to provide an output path to save the screenshot.
Then in result instead of the screenshot you will get a path to the saved file.
"""
validate_url_scheme(body.url)
crawler = None
try:
cfg = CrawlerRunConfig(screenshot=True, screenshot_wait_for=body.screenshot_wait_for, wait_for_images=body.wait_for_images)
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(500, detail=results[0].error_message or "Crawl failed")
screenshot_data = results[0].screenshot
if body.output_path:
abs_path = validate_output_path(body.output_path)
write_output_file(abs_path, base64.b64decode(screenshot_data))
return {"success": True, "path": abs_path}
return {"success": True, "screenshot": screenshot_data}
except Exception as e:
raise HTTPException(500, detail=str(e))
finally:
if crawler:
await release_crawler(crawler)
# PDF endpoint
@app.post("/pdf")
@limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("pdf")
async def generate_pdf(
request: Request,
body: PDFRequest,
_td: Dict = Depends(token_dep),
):
"""
Generate a PDF document of the specified URL,
Use when you need a printable or archivable snapshot of the page. It is recommended to provide an output path to save the PDF.
Then in result instead of the PDF you will get a path to the saved file.
"""
validate_url_scheme(body.url)
crawler = None
try:
cfg = CrawlerRunConfig(pdf=True)
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(500, detail=results[0].error_message or "Crawl failed")
pdf_data = results[0].pdf
if body.output_path:
abs_path = validate_output_path(body.output_path)
write_output_file(abs_path, pdf_data)
return {"success": True, "path": abs_path}
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
except Exception as e:
raise HTTPException(500, detail=str(e))
finally:
if crawler:
await release_crawler(crawler)
@app.post("/execute_js")
@limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("execute_js")
async def execute_js(
request: Request,
body: JSEndpointRequest,
_td: Dict = Depends(token_dep),
):
"""
Execute a sequence of JavaScript snippets on the specified URL.
Return the full CrawlResult JSON (first result).
Use this when you need to interact with dynamic pages using JS.
REMEMBER: Scripts accept a list of separated JS snippets to execute and execute them in order.
IMPORTANT: Each script should be an expression that returns a value. It can be an IIFE or an async function. You can think of it as such.
Your script will replace '{script}' and execute in the browser context. So provide either an IIFE or a sync/async function that returns a value.
Return Format:
- The return result is an instance of CrawlResult, so you have access to markdown, links, and other stuff. If this is enough, you don't need to call again for other endpoints.
```python
class CrawlResult(BaseModel):
url: str
html: str
success: bool
cleaned_html: Optional[str] = None
media: Dict[str, List[Dict]] = {}
links: Dict[str, List[Dict]] = {}
downloaded_files: Optional[List[str]] = None
js_execution_result: Optional[Dict[str, Any]] = None
screenshot: Optional[str] = None
pdf: Optional[bytes] = None
mhtml: Optional[str] = None
_markdown: Optional[MarkdownGenerationResult] = PrivateAttr(default=None)
extracted_content: Optional[str] = None
metadata: Optional[dict] = None
error_message: Optional[str] = None
session_id: Optional[str] = None
response_headers: Optional[dict] = None
status_code: Optional[int] = None
ssl_certificate: Optional[SSLCertificate] = None
dispatch_result: Optional[DispatchResult] = None
redirected_url: Optional[str] = None
network_requests: Optional[List[Dict[str, Any]]] = None
console_messages: Optional[List[Dict[str, Any]]] = None
class MarkdownGenerationResult(BaseModel):
raw_markdown: str
markdown_with_citations: str
references_markdown: str
fit_markdown: Optional[str] = None
fit_html: Optional[str] = None
```
"""
if not EXECUTE_JS_ENABLED:
raise HTTPException(403, "execute_js endpoint is disabled. Set CRAWL4AI_EXECUTE_JS_ENABLED=true to enable.")
validate_url_scheme(body.url)
# Block SSRF: reject internal/private IPs
try:
validate_webhook_url(body.url) # reuse SSRF blocklist
except ValueError as e:
raise HTTPException(400, str(e))
crawler = None
try:
cfg = CrawlerRunConfig(js_code=body.scripts)
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(500, detail=results[0].error_message or "Crawl failed")
data = results[0].model_dump()
return JSONResponse(data)
except Exception as e:
raise HTTPException(500, detail=str(e))
finally:
if crawler:
await release_crawler(crawler)
@app.get("/llm/{url:path}")
async def llm_endpoint(
request: Request,
url: str = Path(...),
q: str = Query(...),
provider: Optional[str] = Query(None, description="LLM provider override, e.g. 'openai/gpt-4o-mini'"),
temperature: Optional[float] = Query(None, description="LLM temperature override"),
base_url: Optional[str] = Query(None, description="LLM API base URL override"),
_td: Dict = Depends(token_dep),
):
if not q:
raise HTTPException(400, "Query parameter 'q' is required")
if not url.startswith(("http://", "https://")) and not url.startswith(("raw:", "raw://")):
url = "https://" + url
answer = await handle_llm_qa(url, q, config, provider=provider, temperature=temperature, base_url=base_url)
return JSONResponse({"answer": answer})
@app.get("/schema")
async def get_schema():
from crawl4ai import BrowserConfig, CrawlerRunConfig
return {"browser": BrowserConfig().dump(),
"crawler": CrawlerRunConfig().dump()}
@app.get("/hooks/info")
async def get_hooks_info():
"""Get information about available hook points and their signatures"""
from hook_manager import UserHookManager
hook_info = {}
for hook_point, params in UserHookManager.HOOK_SIGNATURES.items():
hook_info[hook_point] = {
"parameters": params,
"description": get_hook_description(hook_point),
"example": get_hook_example(hook_point)
}
return JSONResponse({
"available_hooks": hook_info,
"timeout_limits": {
"min": 1,
"max": 120,
"default": 30
}
})
def get_hook_description(hook_point: str) -> str:
"""Get description for each hook point"""
descriptions = {
"on_browser_created": "Called after browser instance is created",
"on_page_context_created": "Called after page and context are created - ideal for authentication",
"before_goto": "Called before navigating to the target URL",
"after_goto": "Called after navigation is complete",
"on_user_agent_updated": "Called when user agent is updated",
"on_execution_started": "Called when custom JavaScript execution begins",
"before_retrieve_html": "Called before retrieving the final HTML - ideal for scrolling",
"before_return_html": "Called just before returning the HTML content"
}
return descriptions.get(hook_point, "")
def get_hook_example(hook_point: str) -> str:
"""Get example code for each hook point"""
examples = {
"on_page_context_created": """async def hook(page, context, **kwargs):
# Add authentication cookie
await context.add_cookies([{
'name': 'session',
'value': 'my-session-id',
'domain': '.example.com'
}])
return page""",
"before_retrieve_html": """async def hook(page, context, **kwargs):
# Scroll to load lazy content
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(2000)
return page""",
"before_goto": """async def hook(page, context, url, **kwargs):
# Set custom headers
await page.set_extra_http_headers({
'X-Custom-Header': 'value'
})
return page"""
}
return examples.get(hook_point, "# Implement your hook logic here\nreturn page")
@app.get(config["observability"]["health_check"]["endpoint"])
async def health():
return {"status": "ok", "timestamp": time.time(), "version": __version__}
@app.get(config["observability"]["prometheus"]["endpoint"])
async def metrics():
return RedirectResponse(config["observability"]["prometheus"]["endpoint"])
@app.post("/crawl")
@limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("crawl")
async def crawl(
request: Request,
crawl_request: CrawlRequestWithHooks,
_td: Dict = Depends(token_dep),
):
"""
Crawl a list of URLs and return the results as JSON.
For streaming responses, use /crawl/stream endpoint.
Supports optional user-provided hook functions for customization.
"""
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
if crawl_request.hooks and not HOOKS_ENABLED:
raise HTTPException(403, "Hooks are disabled. Set CRAWL4AI_HOOKS_ENABLED=true to enable.")
# Check whether it is a redirection for a streaming request
crawler_config = CrawlerRunConfig.load(crawl_request.crawler_config)
if crawler_config.stream:
return await stream_process(crawl_request=crawl_request)
# Prepare hooks config if provided
hooks_config = None
if crawl_request.hooks:
hooks_config = {
'code': crawl_request.hooks.code,
'timeout': crawl_request.hooks.timeout
}
results = await handle_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
hooks_config=hooks_config,
crawler_configs=crawl_request.crawler_configs,
)
# check if all of the results are not successful
if all(not result["success"] for result in results["results"]):
raise HTTPException(500, f"Crawl request failed: {results['results'][0]['error_message']}")
return JSONResponse(results)
@app.post("/crawl/stream")
@limiter.limit(config["rate_limiting"]["default_limit"])
async def crawl_stream(
request: Request,
crawl_request: CrawlRequestWithHooks,
_td: Dict = Depends(token_dep),
):
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
if crawl_request.hooks and not HOOKS_ENABLED:
raise HTTPException(403, "Hooks are disabled. Set CRAWL4AI_HOOKS_ENABLED=true to enable.")
return await stream_process(crawl_request=crawl_request)
async def stream_process(crawl_request: CrawlRequestWithHooks):
# Prepare hooks config if provided# Prepare hooks config if provided
hooks_config = None
if crawl_request.hooks:
hooks_config = {
'code': crawl_request.hooks.code,
'timeout': crawl_request.hooks.timeout
}
crawler, gen, hooks_info = await handle_stream_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
hooks_config=hooks_config
)
# Add hooks info to response headers if available
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Stream-Status": "active",
}
if hooks_info:
import json
headers["X-Hooks-Status"] = json.dumps(hooks_info['status']['status'])
return StreamingResponse(
stream_results(crawler, gen),
media_type="application/x-ndjson",
headers=headers,
)
def chunk_code_functions(code_md: str) -> List[str]:
"""Extract each function/class from markdown code blocks per file."""
pattern = re.compile(
# match "## File: <path>" then a ```py fence, then capture until the closing ```
r'##\s*File:\s*(?P<path>.+?)\s*?\r?\n' # file header
r'```py\s*?\r?\n' # opening fence
r'(?P<code>.*?)(?=\r?\n```)', # code block
re.DOTALL
)
chunks: List[str] = []
for m in pattern.finditer(code_md):
file_path = m.group("path").strip()
code_blk = m.group("code")
tree = ast.parse(code_blk)
lines = code_blk.splitlines()
for node in tree.body:
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
start = node.lineno - 1
end = getattr(node, "end_lineno", start + 1)
snippet = "\n".join(lines[start:end])
chunks.append(f"# File: {file_path}\n{snippet}")
return chunks
def chunk_doc_sections(doc: str) -> List[str]:
lines = doc.splitlines(keepends=True)
sections = []
current: List[str] = []
for line in lines:
if re.match(r"^#{1,6}\s", line):
if current:
sections.append("".join(current))
current = [line]
else:
current.append(line)
if current:
sections.append("".join(current))
return sections
@app.get("/ask")
@limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("ask")
async def get_context(
request: Request,
_td: Dict = Depends(token_dep),
context_type: str = Query("all", regex="^(code|doc|all)$"),
query: Optional[str] = Query(
None, description="search query to filter chunks"),
score_ratio: float = Query(
0.5, ge=0.0, le=1.0, description="min score as fraction of max_score"),
max_results: int = Query(
20, ge=1, description="absolute cap on returned chunks"),
):
"""
This end point is design for any questions about Crawl4ai library. It returns a plain text markdown with extensive information about Crawl4ai.
You can use this as a context for any AI assistant. Use this endpoint for AI assistants to retrieve library context for decision making or code generation tasks.
Alway is BEST practice you provide a query to filter the context. Otherwise the lenght of the response will be very long.
Parameters:
- context_type: Specify "code" for code context, "doc" for documentation context, or "all" for both.
- query: RECOMMENDED search query to filter paragraphs using BM25. You can leave this empty to get all the context.
- score_ratio: Minimum score as a fraction of the maximum score for filtering results.
- max_results: Maximum number of results to return. Default is 20.
Returns:
- JSON response with the requested context.
- If "code" is specified, returns the code context.
- If "doc" is specified, returns the documentation context.
- If "all" is specified, returns both code and documentation contexts.
"""
# load contexts
base = os.path.dirname(__file__)
code_path = os.path.join(base, "c4ai-code-context.md")
doc_path = os.path.join(base, "c4ai-doc-context.md")
if not os.path.exists(code_path) or not os.path.exists(doc_path):
raise HTTPException(404, "Context files not found")
with open(code_path, "r") as f:
code_content = f.read()
with open(doc_path, "r") as f:
doc_content = f.read()
# if no query, just return raw contexts
if not query:
if context_type == "code":
return JSONResponse({"code_context": code_content})
if context_type == "doc":
return JSONResponse({"doc_context": doc_content})
return JSONResponse({
"code_context": code_content,
"doc_context": doc_content,
})
tokens = query.split()
results: Dict[str, List[Dict[str, float]]] = {}
# code BM25 over functions/classes
if context_type in ("code", "all"):
code_chunks = chunk_code_functions(code_content)
bm25 = BM25Okapi([c.split() for c in code_chunks])
scores = bm25.get_scores(tokens)
max_sc = float(scores.max()) if scores.size > 0 else 0.0
cutoff = max_sc * score_ratio
picked = [(c, s) for c, s in zip(code_chunks, scores) if s >= cutoff]
picked = sorted(picked, key=lambda x: x[1], reverse=True)[:max_results]
results["code_results"] = [{"text": c, "score": s} for c, s in picked]
# doc BM25 over markdown sections
if context_type in ("doc", "all"):
sections = chunk_doc_sections(doc_content)
bm25d = BM25Okapi([sec.split() for sec in sections])
scores_d = bm25d.get_scores(tokens)
max_sd = float(scores_d.max()) if scores_d.size > 0 else 0.0
cutoff_d = max_sd * score_ratio
idxs = [i for i, s in enumerate(scores_d) if s >= cutoff_d]
neighbors = set(i for idx in idxs for i in (idx-1, idx, idx+1))
valid = [i for i in sorted(neighbors) if 0 <= i < len(sections)]
valid = valid[:max_results]
results["doc_results"] = [
{"text": sections[i], "score": scores_d[i]} for i in valid
]
return JSONResponse(results)
# attach MCP layer (adds /mcp/ws, /mcp/sse, /mcp/schema)
print(f"MCP server running on {config['app']['host']}:{config['app']['port']}")
attach_mcp(
app,
base_url=f"http://{config['app']['host']}:{config['app']['port']}"
)
# ────────────────────────── cli ──────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"server:app",
host=config["app"]["host"],
port=config["app"]["port"],
reload=config["app"]["reload"],
timeout_keep_alive=config["app"]["timeout_keep_alive"],
)
# ─────────────────────────────────────────────────────────────