webclaw/crates/webclaw-fetch/src/client.rs
Valerio 499345046c fix: harden LLM providers, UTF-8 handling, and webhook/batch reliability
- webclaw-llm: add explicit request + connect timeouts to the reqwest
  client in every provider (anthropic, openai, ollama) with a shorter
  timeout on the ollama health check, so a stalled provider fails fast.
- webclaw-llm: fix a panic when truncating a provider error body that
  contains multibyte characters near the 500-char cut (char-safe take).
- webclaw-core: snap the endpoint-scan budget cut to a UTF-8 char
  boundary so oversized scripts with non-ASCII content no longer panic.
- webclaw-core: rewrite js_literal_to_json to copy raw bytes instead of
  `byte as char`, preserving multibyte UTF-8 in SvelteKit string values
  rather than producing Latin-1 mojibake.
- webclaw-cli: have fire_webhook return its JoinHandle and await it at
  the crawl/batch/batch-llm call sites, removing the fixed 500ms sleeps.
- webclaw-mcp: drop the up-front DNS pre-validation loop in batch that
  aborted the whole request on one bad URL; the fetch layer already
  applies the same SSRF guard per URL and reports per-URL errors.
- webclaw-fetch: include the port in the warmup homepage URL so hosts
  on a non-default port are warmed correctly.

Adds regression tests for the UTF-8 endpoint-scan and SvelteKit cases.
2026-06-09 21:10:15 +02:00

1100 lines
38 KiB
Rust

/// HTTP client with browser TLS fingerprint impersonation.
/// Uses wreq (BoringSSL) for browser-grade TLS + HTTP/2 fingerprinting.
/// Supports single and batch operations with proxy rotation.
/// Automatically detects PDF responses and extracts text via webclaw-pdf.
///
/// Two proxy modes:
/// - **Static**: single proxy (or none) baked into pre-built clients at construction.
/// - **Rotating**: pre-built pool of clients, each with a different proxy + fingerprint.
/// Same-host URLs are routed to the same client for HTTP/2 connection reuse.
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures_util::StreamExt;
use rand::seq::SliceRandom;
use tokio::sync::Semaphore;
use tracing::{debug, instrument, warn};
use webclaw_pdf::PdfMode;
use crate::browser::{self, BrowserProfile, BrowserVariant};
use crate::error::FetchError;
/// Configuration for building a [`FetchClient`].
#[derive(Debug, Clone)]
pub struct FetchConfig {
pub browser: BrowserProfile,
/// Single proxy URL. Used when `proxy_pool` is empty.
pub proxy: Option<String>,
/// Pool of proxy URLs to rotate through.
/// When non-empty, each proxy gets a pre-built client with a
/// random browser fingerprint. Same-host URLs reuse the same client
/// for HTTP/2 connection multiplexing.
pub proxy_pool: Vec<String>,
pub timeout: Duration,
pub follow_redirects: bool,
pub max_redirects: u32,
pub headers: HashMap<String, String>,
pub pdf_mode: PdfMode,
}
impl Default for FetchConfig {
fn default() -> Self {
Self {
browser: BrowserProfile::Chrome,
proxy: None,
proxy_pool: Vec::new(),
timeout: Duration::from_secs(12),
follow_redirects: true,
max_redirects: 10,
headers: HashMap::from([("Accept-Language".to_string(), "en-US,en;q=0.9".to_string())]),
pdf_mode: PdfMode::default(),
}
}
}
/// Result of a successful fetch.
#[derive(Debug, Clone)]
pub struct FetchResult {
pub html: String,
pub status: u16,
/// Final URL after any redirects.
pub url: String,
pub headers: http::HeaderMap,
pub elapsed: Duration,
}
/// Result for a single URL in a batch fetch operation.
#[derive(Debug)]
pub struct BatchResult {
pub url: String,
pub result: Result<FetchResult, FetchError>,
}
/// Result for a single URL in a batch fetch-and-extract operation.
#[derive(Debug)]
pub struct BatchExtractResult {
pub url: String,
pub result: Result<webclaw_core::ExtractionResult, FetchError>,
}
/// Buffered response that owns its body. Provides the same sync API
/// that webclaw-http::Response used to provide.
struct Response {
status: u16,
url: String,
headers: http::HeaderMap,
body: bytes::Bytes,
}
/// Maximum fetched body size. A single 50 MB HTML document is already
/// several orders of magnitude past any realistic page; larger responses
/// are either malicious (log bomb, zip-bomb decompressed) or streaming
/// bugs. Caps the blast radius of the HTML → markdown conversion
/// downstream (which could otherwise allocate multiple full-size Strings
/// per page in collapse_whitespace + strip_markdown).
const MAX_BODY_BYTES: u64 = 50 * 1024 * 1024;
/// Running decompression-bomb guard: reject as soon as the bytes already
/// buffered plus the next decompressed chunk would cross [`MAX_BODY_BYTES`].
/// Saturating arithmetic so a huge chunk length can't wrap the sum.
fn check_body_ceiling(buffered: usize, next_chunk: usize) -> Result<(), FetchError> {
let total = (buffered as u64).saturating_add(next_chunk as u64);
if total > MAX_BODY_BYTES {
return Err(FetchError::BodyDecode(format!(
"response body exceeds cap {MAX_BODY_BYTES} bytes (decompressed)"
)));
}
Ok(())
}
impl Response {
/// Buffer a wreq response into an owned Response.
///
/// Rejects bodies that advertise a Content-Length beyond
/// [`MAX_BODY_BYTES`] before we pay any allocation, then streams the
/// body chunk-by-chunk while enforcing a running ceiling. `chunk()`
/// yields *post-decompression* bytes (gzip/brotli/zstd/deflate are
/// negotiated), so a tiny compressed payload that inflates to
/// gigabytes is aborted as soon as the accumulated size crosses the
/// cap — it never gets fully buffered in memory.
async fn from_wreq(resp: wreq::Response) -> Result<Self, FetchError> {
if let Some(len) = resp.content_length()
&& len > MAX_BODY_BYTES
{
return Err(FetchError::BodyDecode(format!(
"response body {len} bytes exceeds cap {MAX_BODY_BYTES}"
)));
}
let status = resp.status().as_u16();
let url = resp.uri().to_string();
let headers = resp.headers().clone();
// wreq 6.0.0-rc.29 dropped `Response::chunk()`. Stream post-decompression
// bytes via `bytes_stream()` and keep enforcing the running ceiling so a
// compression bomb is aborted before it is fully buffered in memory.
let mut buf = bytes::BytesMut::new();
let mut stream = resp.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| FetchError::BodyDecode(e.to_string()))?;
check_body_ceiling(buf.len(), chunk.len())?;
buf.extend_from_slice(&chunk);
}
Ok(Self {
status,
url,
headers,
body: buf.freeze(),
})
}
fn status(&self) -> u16 {
self.status
}
fn url(&self) -> &str {
&self.url
}
fn headers(&self) -> &http::HeaderMap {
&self.headers
}
fn body(&self) -> &[u8] {
&self.body
}
fn text(&self) -> std::borrow::Cow<'_, str> {
String::from_utf8_lossy(&self.body)
}
fn into_text(self) -> String {
String::from_utf8_lossy(&self.body).into_owned()
}
}
/// Internal representation of the client pool strategy.
enum ClientPool {
/// Pre-built clients with a fixed proxy (or no proxy).
/// Fingerprint rotation still works via the pool when `random` is true.
Static {
clients: Vec<wreq::Client>,
random: bool,
},
/// Pre-built pool of clients, each with a different proxy + fingerprint.
/// Requests pick a client deterministically by host for HTTP/2 connection reuse.
Rotating { clients: Vec<wreq::Client> },
}
/// HTTP client with browser TLS + HTTP/2 fingerprinting via wreq.
///
/// Operates in two modes:
/// - **Static pool**: pre-built clients, optionally with fingerprint rotation.
/// Used when no `proxy_pool` is configured. Fast (no per-request construction).
/// - **Rotating pool**: pre-built clients, one per proxy in the pool.
/// Same-host URLs are routed to the same client for HTTP/2 multiplexing.
pub struct FetchClient {
pool: ClientPool,
pdf_mode: PdfMode,
/// Optional cloud-fallback client. Extractors that need to
/// escalate past bot protection call `client.cloud()` to get this
/// out. Stored as `Arc` so cloning a `FetchClient` (common in
/// axum state) doesn't clone the underlying reqwest pool.
cloud: Option<std::sync::Arc<crate::cloud::CloudClient>>,
}
impl FetchClient {
/// Build a new client from config.
pub fn new(config: FetchConfig) -> Result<Self, FetchError> {
let variants = collect_variants(&config.browser);
let pdf_mode = config.pdf_mode.clone();
let pool = if config.proxy_pool.is_empty() {
let clients = variants
.into_iter()
.map(|v| {
crate::tls::build_client(
v,
config.timeout,
&config.headers,
config.proxy.as_deref(),
config.follow_redirects,
config.max_redirects,
)
})
.collect::<Result<Vec<_>, _>>()?;
let random = matches!(config.browser, BrowserProfile::Random);
debug!(
count = clients.len(),
random, "fetch client ready (static pool)"
);
ClientPool::Static { clients, random }
} else {
let mut rng = rand::thread_rng();
let clients = config
.proxy_pool
.iter()
.map(|proxy| {
let v = *variants.choose(&mut rng).unwrap();
crate::tls::build_client(
v,
config.timeout,
&config.headers,
Some(proxy),
config.follow_redirects,
config.max_redirects,
)
})
.collect::<Result<Vec<_>, _>>()?;
debug!(
clients = clients.len(),
"fetch client ready (pre-built rotating pool)"
);
ClientPool::Rotating { clients }
};
Ok(Self {
pool,
pdf_mode,
cloud: None,
})
}
/// Attach a cloud-fallback client. Returns `self` so it composes in
/// a builder-ish way:
///
/// ```ignore
/// let client = FetchClient::new(config)?
/// .with_cloud(CloudClient::from_env()?);
/// ```
///
/// Extractors that can escalate past bot protection will call
/// `client.cloud()` internally. Sets the field regardless of
/// whether `cloud` is configured to bypass anything specific —
/// attachment is cheap (just wraps in `Arc`).
pub fn with_cloud(mut self, cloud: crate::cloud::CloudClient) -> Self {
self.cloud = Some(std::sync::Arc::new(cloud));
self
}
/// Optional cloud-fallback client, if one was attached via
/// [`Self::with_cloud`]. Extractors that handle antibot sites
/// pass this into `cloud::smart_fetch_html`.
pub fn cloud(&self) -> Option<&crate::cloud::CloudClient> {
self.cloud.as_deref()
}
/// Fetch a URL with per-site rescue paths: Reddit URLs redirect to the
/// `.json` API, and Akamai-style challenge responses trigger a homepage
/// cookie warmup and a retry. Returns the same `FetchResult` shape as
/// [`Self::fetch`] so every caller (CLI, MCP, OSS server, production
/// server) benefits without shape churn.
///
/// This is the method most callers want. Use plain [`Self::fetch`] only
/// when you need literal no-rescue behavior (e.g. inside the rescue
/// logic itself to avoid recursion).
pub async fn fetch_smart(&self, url: &str) -> Result<FetchResult, FetchError> {
// Reddit: fetch old.reddit.com for stable server-rendered HTML.
// The JSON API is blocked; old.reddit.com works without JS or auth.
let owned;
let url = if crate::reddit::is_reddit_url(url) {
owned = crate::reddit::to_old_reddit_url(url);
owned.as_str()
} else {
url
};
let resp = self.fetch(url).await?;
// Akamai / bazadebezolkohpepadr challenge: visit the homepage to
// collect warmup cookies (_abck, bm_sz, etc.), then retry.
if is_challenge_html(&resp.html)
&& let Some(homepage) = extract_homepage(url)
{
debug!("challenge detected, warming cookies via {homepage}");
let _ = self.fetch(&homepage).await;
if let Ok(retry) = self.fetch(url).await {
return Ok(retry);
}
}
Ok(resp)
}
/// Fetch a URL and return the raw HTML + response metadata.
///
/// Automatically retries on transient failures (network errors, 5xx, 429)
/// with exponential backoff: 0s, 1s (2 attempts total). No per-site
/// rescue logic; use [`Self::fetch_smart`] for that.
#[instrument(skip(self), fields(url = %url))]
pub async fn fetch(&self, url: &str) -> Result<FetchResult, FetchError> {
let delays = [Duration::ZERO, Duration::from_secs(1)];
let mut last_err = None;
for (attempt, delay) in delays.iter().enumerate() {
if attempt > 0 {
tokio::time::sleep(*delay).await;
}
match self.fetch_once(url).await {
Ok(result) => {
if is_retryable_status(result.status) && attempt < delays.len() - 1 {
warn!(
url,
status = result.status,
attempt = attempt + 1,
"retryable status, will retry"
);
last_err = Some(FetchError::Build(format!("HTTP {}", result.status)));
continue;
}
if attempt > 0 {
debug!(url, attempt = attempt + 1, "retry succeeded");
}
return Ok(result);
}
Err(e) => {
if !is_retryable_error(&e) || attempt == delays.len() - 1 {
return Err(e);
}
warn!(
url,
error = %e,
attempt = attempt + 1,
"transient error, will retry"
);
last_err = Some(e);
}
}
}
Err(last_err.unwrap_or_else(|| FetchError::Build("all retries exhausted".into())))
}
/// Single fetch attempt.
async fn fetch_once(&self, url: &str) -> Result<FetchResult, FetchError> {
self.fetch_once_with_headers(url, &[]).await
}
/// Single fetch attempt with optional per-request headers appended
/// after the profile defaults. Used by extractors that need to
/// satisfy site-specific headers (e.g. `x-ig-app-id` for Instagram's
/// internal API).
async fn fetch_once_with_headers(
&self,
url: &str,
extra: &[(&str, &str)],
) -> Result<FetchResult, FetchError> {
let parsed_url = crate::url_security::validate_public_http_url(url).await?;
let url = parsed_url.as_str();
let start = Instant::now();
let client = self.pick_client(url);
let mut req = client.get(url);
for (k, v) in extra {
req = req.header(*k, *v);
}
let resp = req.send().await?;
let response = Response::from_wreq(resp).await?;
response_to_result(response, start)
}
/// Fetch a URL with extra per-request headers appended after the
/// browser-profile defaults. Same retry semantics as `fetch`.
///
/// Use this when an upstream API requires a header the global
/// `FetchConfig.headers` shouldn't carry to other hosts (Instagram's
/// `x-ig-app-id`, GitHub's `Authorization` once we wire `GITHUB_TOKEN`,
/// Reddit's compliant UA when we add OAuth, etc.).
#[instrument(skip(self, extra), fields(url = %url, extra_count = extra.len()))]
pub async fn fetch_with_headers(
&self,
url: &str,
extra: &[(&str, &str)],
) -> Result<FetchResult, FetchError> {
let delays = [Duration::ZERO, Duration::from_secs(1)];
let mut last_err = None;
for (attempt, delay) in delays.iter().enumerate() {
if attempt > 0 {
tokio::time::sleep(*delay).await;
}
match self.fetch_once_with_headers(url, extra).await {
Ok(result) => {
if is_retryable_status(result.status) && attempt < delays.len() - 1 {
warn!(
url,
status = result.status,
attempt = attempt + 1,
"retryable status, will retry"
);
last_err = Some(FetchError::Build(format!("HTTP {}", result.status)));
continue;
}
if attempt > 0 {
debug!(url, attempt = attempt + 1, "retry succeeded");
}
return Ok(result);
}
Err(e) => {
if !is_retryable_error(&e) || attempt == delays.len() - 1 {
return Err(e);
}
warn!(
url,
error = %e,
attempt = attempt + 1,
"transient error, will retry"
);
last_err = Some(e);
}
}
}
Err(last_err.unwrap_or_else(|| FetchError::Build("all retries exhausted".into())))
}
/// Fetch a URL then extract structured content.
#[instrument(skip(self), fields(url = %url))]
pub async fn fetch_and_extract(
&self,
url: &str,
) -> Result<webclaw_core::ExtractionResult, FetchError> {
self.fetch_and_extract_with_options(url, &webclaw_core::ExtractionOptions::default())
.await
}
/// Fetch a URL then extract structured content with custom extraction options.
#[instrument(skip(self, options), fields(url = %url))]
pub async fn fetch_and_extract_with_options(
&self,
url: &str,
options: &webclaw_core::ExtractionOptions,
) -> Result<webclaw_core::ExtractionResult, FetchError> {
let parsed_url = crate::url_security::validate_public_http_url(url).await?;
let url = parsed_url.as_str();
// Reddit: rewrite to old.reddit.com for stable server-rendered HTML.
// webclaw-core's Reddit fast path then parses the thread structure.
let reddit_owned;
let url = if crate::reddit::is_reddit_url(url) {
reddit_owned = crate::reddit::to_old_reddit_url(url);
debug!("reddit: rewriting to {reddit_owned}");
reddit_owned.as_str()
} else {
url
};
let start = Instant::now();
let client = self.pick_client(url);
let resp = client.get(url).send().await?;
let mut response = Response::from_wreq(resp).await?;
// Cookie warmup: if we get a challenge page, visit the homepage first
// to collect Akamai cookies (_abck, bm_sz, etc.), then retry.
if is_challenge_response(&response)
&& let Some(homepage) = extract_homepage(url)
{
debug!("challenge detected, warming cookies via {homepage}");
let _ = self.fetch(&homepage).await;
let resp = client.get(url).send().await?;
response = Response::from_wreq(resp).await?;
debug!("retried after cookie warmup: status={}", response.status());
}
let status = response.status();
let final_url = response.url().to_string();
let headers = response.headers().clone();
let is_pdf = is_pdf_content_type(&headers);
if is_pdf {
debug!(status, "detected PDF response, using pdf extraction");
let bytes = response.body();
let elapsed = start.elapsed();
debug!(
status,
bytes = bytes.len(),
elapsed_ms = %elapsed.as_millis(),
"PDF fetch complete"
);
let pdf_result = webclaw_pdf::extract_pdf(bytes, self.pdf_mode.clone())?;
Ok(pdf_to_extraction_result(&pdf_result, &final_url))
} else if let Some(doc_type) =
crate::document::is_document_content_type(&headers, &final_url)
{
debug!(status, doc_type = ?doc_type, "detected document response, extracting");
let bytes = response.body();
let elapsed = start.elapsed();
debug!(
status,
bytes = bytes.len(),
elapsed_ms = %elapsed.as_millis(),
"document fetch complete"
);
let mut result = crate::document::extract_document(bytes, doc_type)?;
result.metadata.url = Some(final_url);
Ok(result)
} else {
let html = response.into_text();
let elapsed = start.elapsed();
debug!(status, elapsed_ms = %elapsed.as_millis(), "fetch complete");
// LinkedIn: extract from embedded <code> JSON blobs
if crate::linkedin::is_linkedin_post(&final_url) {
if let Some(result) = crate::linkedin::extract_linkedin_post(&html, &final_url) {
debug!("linkedin extraction succeeded");
return Ok(result);
}
debug!("linkedin extraction failed, falling back to standard");
}
let extraction = webclaw_core::extract_with_options(&html, Some(&final_url), options)?;
Ok(extraction)
}
}
/// Fetch multiple URLs concurrently with bounded parallelism.
pub async fn fetch_batch(
self: &Arc<Self>,
urls: &[&str],
concurrency: usize,
) -> Vec<BatchResult> {
let semaphore = Arc::new(Semaphore::new(concurrency));
let mut handles = Vec::with_capacity(urls.len());
for (idx, url) in urls.iter().enumerate() {
let permit = Arc::clone(&semaphore);
let client = Arc::clone(self);
let url = url.to_string();
handles.push(tokio::spawn(async move {
// Don't panic if the semaphore has been closed under us
// (adversarial runtime state or shutdown race). Surface a
// typed error instead so the caller sees one failed URL in
// the batch instead of a silently-dropped task.
let result = match permit.acquire().await {
Ok(_permit) => client.fetch(&url).await,
Err(_) => Err(FetchError::Build("semaphore closed before acquire".into())),
};
(idx, BatchResult { url, result })
}));
}
collect_ordered(handles, urls.len()).await
}
/// Fetch and extract multiple URLs concurrently with bounded parallelism.
pub async fn fetch_and_extract_batch(
self: &Arc<Self>,
urls: &[&str],
concurrency: usize,
) -> Vec<BatchExtractResult> {
self.fetch_and_extract_batch_with_options(
urls,
concurrency,
&webclaw_core::ExtractionOptions::default(),
)
.await
}
/// Fetch and extract multiple URLs concurrently with custom extraction options.
pub async fn fetch_and_extract_batch_with_options(
self: &Arc<Self>,
urls: &[&str],
concurrency: usize,
options: &webclaw_core::ExtractionOptions,
) -> Vec<BatchExtractResult> {
let semaphore = Arc::new(Semaphore::new(concurrency));
let mut handles = Vec::with_capacity(urls.len());
for (idx, url) in urls.iter().enumerate() {
let permit = Arc::clone(&semaphore);
let client = Arc::clone(self);
let url = url.to_string();
let opts = options.clone();
handles.push(tokio::spawn(async move {
let result = match permit.acquire().await {
Ok(_permit) => client.fetch_and_extract_with_options(&url, &opts).await,
Err(_) => Err(FetchError::Build("semaphore closed before acquire".into())),
};
(idx, BatchExtractResult { url, result })
}));
}
collect_ordered(handles, urls.len()).await
}
/// Returns the number of proxies in the rotation pool, or 0 if static mode.
pub fn proxy_pool_size(&self) -> usize {
match &self.pool {
ClientPool::Static { .. } => 0,
ClientPool::Rotating { clients } => clients.len(),
}
}
/// Pick a client from the pool for a given URL.
fn pick_client(&self, url: &str) -> &wreq::Client {
match &self.pool {
ClientPool::Static { clients, random } => {
if *random {
let host = extract_host(url);
pick_for_host(clients, &host)
} else {
&clients[0]
}
}
ClientPool::Rotating { clients } => pick_random(clients),
}
}
}
// ---------------------------------------------------------------------------
// Fetcher trait implementation
//
// Vertical extractors consume the [`crate::fetcher::Fetcher`] trait
// rather than `FetchClient` directly, which is what lets the production
// API server swap in a tls-sidecar-backed implementation without
// pulling wreq into its dependency graph. For everyone else (CLI, MCP,
// self-hosted OSS server) this impl means "pass the FetchClient you
// already have; nothing changes".
// ---------------------------------------------------------------------------
#[async_trait::async_trait]
impl crate::fetcher::Fetcher for FetchClient {
async fn fetch(&self, url: &str) -> Result<FetchResult, FetchError> {
FetchClient::fetch(self, url).await
}
async fn fetch_with_headers(
&self,
url: &str,
headers: &[(&str, &str)],
) -> Result<FetchResult, FetchError> {
FetchClient::fetch_with_headers(self, url, headers).await
}
fn cloud(&self) -> Option<&crate::cloud::CloudClient> {
FetchClient::cloud(self)
}
}
/// Collect the browser variants to use based on the browser profile.
fn collect_variants(profile: &BrowserProfile) -> Vec<BrowserVariant> {
match profile {
BrowserProfile::Random => browser::all_variants(),
BrowserProfile::Chrome => vec![browser::latest_chrome()],
BrowserProfile::Firefox => vec![browser::latest_firefox()],
BrowserProfile::SafariIos => vec![BrowserVariant::SafariIos26],
}
}
/// Convert a buffered Response into a FetchResult.
fn response_to_result(response: Response, start: Instant) -> Result<FetchResult, FetchError> {
let status = response.status();
let final_url = response.url().to_string();
let headers = response.headers().clone();
let html = response.into_text();
let elapsed = start.elapsed();
debug!(status, elapsed_ms = %elapsed.as_millis(), "fetch complete");
Ok(FetchResult {
html,
status,
url: final_url,
headers,
elapsed,
})
}
/// Extract the host from a URL, returning empty string on parse failure.
fn extract_host(url: &str) -> String {
url::Url::parse(url)
.ok()
.and_then(|u| u.host_str().map(String::from))
.unwrap_or_default()
}
/// Pick a client deterministically based on a host string.
/// Same host always gets the same client, enabling HTTP/2 connection reuse.
fn pick_for_host<'a>(clients: &'a [wreq::Client], host: &str) -> &'a wreq::Client {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
host.hash(&mut hasher);
let idx = (hasher.finish() as usize) % clients.len();
&clients[idx]
}
/// Pick a random client from the pool for per-request rotation.
fn pick_random(clients: &[wreq::Client]) -> &wreq::Client {
use rand::Rng;
let idx = rand::thread_rng().gen_range(0..clients.len());
&clients[idx]
}
/// Status codes worth retrying: server errors + rate limiting.
fn is_retryable_status(status: u16) -> bool {
status == 429
|| status == 502
|| status == 503
|| status == 504
|| status == 520
|| status == 521
|| status == 522
|| status == 523
|| status == 524
}
/// Errors worth retrying: network/connection failures (not client errors).
fn is_retryable_error(err: &FetchError) -> bool {
matches!(err, FetchError::Request(_) | FetchError::BodyDecode(_))
}
fn is_pdf_content_type(headers: &http::HeaderMap) -> bool {
headers
.get("content-type")
.and_then(|ct| ct.to_str().ok())
.map(|ct| {
let mime = ct.split(';').next().unwrap_or("").trim();
mime.eq_ignore_ascii_case("application/pdf")
})
.unwrap_or(false)
}
/// Detect if a response looks like a bot protection challenge page.
fn is_challenge_response(response: &Response) -> bool {
let body_len = response.body().len();
if body_len > 15_000 || body_len == 0 {
return false;
}
is_challenge_html(response.text().as_ref())
}
/// Same as `is_challenge_response`, operating on a body string directly
/// so callers holding a `FetchResult` can reuse the heuristic.
fn is_challenge_html(html: &str) -> bool {
let len = html.len();
if len > 15_000 || len == 0 {
return false;
}
let lower = html.to_lowercase();
if lower.contains("<title>challenge page</title>") {
return true;
}
if lower.contains("bazadebezolkohpepadr") && len < 5_000 {
return true;
}
false
}
/// Extract the homepage URL (scheme + host[:port]) from a full URL.
fn extract_homepage(url: &str) -> Option<String> {
url::Url::parse(url).ok().map(|u| {
let host = u.host_str().unwrap_or("");
// `port()` is `Some` only for a non-default port; include it so a
// host like example.com:8443 is warmed on the right port.
match u.port() {
Some(port) => format!("{}://{}:{}/", u.scheme(), host, port),
None => format!("{}://{}/", u.scheme(), host),
}
})
}
/// Convert a webclaw-pdf PdfResult into a webclaw-core ExtractionResult.
fn pdf_to_extraction_result(
pdf: &webclaw_pdf::PdfResult,
url: &str,
) -> webclaw_core::ExtractionResult {
let markdown = webclaw_pdf::to_markdown(pdf);
let word_count = markdown.split_whitespace().count();
webclaw_core::ExtractionResult {
metadata: webclaw_core::Metadata {
title: pdf.metadata.title.clone(),
description: pdf.metadata.subject.clone(),
author: pdf.metadata.author.clone(),
published_date: None,
language: None,
url: Some(url.to_string()),
site_name: None,
image: None,
favicon: None,
word_count,
},
content: webclaw_core::Content {
markdown,
plain_text: pdf.text.clone(),
links: Vec::new(),
images: Vec::new(),
code_blocks: Vec::new(),
raw_html: None,
},
domain_data: None,
structured_data: vec![],
}
}
/// Collect spawned tasks and reorder results to match input order.
async fn collect_ordered<T>(
handles: Vec<tokio::task::JoinHandle<(usize, T)>>,
len: usize,
) -> Vec<T> {
let mut slots: Vec<Option<T>> = (0..len).map(|_| None).collect();
for handle in handles {
match handle.await {
Ok((idx, result)) => {
slots[idx] = Some(result);
}
Err(e) => {
warn!(error = %e, "batch task panicked");
}
}
}
slots.into_iter().flatten().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_result_struct() {
let ok = BatchResult {
url: "https://example.com".to_string(),
result: Ok(FetchResult {
html: "<html></html>".to_string(),
status: 200,
url: "https://example.com".to_string(),
headers: http::HeaderMap::new(),
elapsed: Duration::from_millis(42),
}),
};
assert_eq!(ok.url, "https://example.com");
assert!(ok.result.is_ok());
assert_eq!(ok.result.unwrap().status, 200);
let err = BatchResult {
url: "https://bad.example".to_string(),
result: Err(FetchError::InvalidUrl("bad url".into())),
};
assert!(err.result.is_err());
}
#[test]
fn body_ceiling_allows_under_cap() {
assert!(check_body_ceiling(0, 1024).is_ok());
assert!(check_body_ceiling(MAX_BODY_BYTES as usize - 1, 1).is_ok());
}
#[test]
fn body_ceiling_rejects_at_and_over_cap() {
// Exactly at the cap is allowed; one byte over is rejected.
assert!(check_body_ceiling(MAX_BODY_BYTES as usize, 1).is_err());
// A small buffer plus a huge inflated chunk (decompression bomb)
// is caught on the very first oversized chunk.
let err = check_body_ceiling(16, 64 * 1024 * 1024).unwrap_err();
assert!(matches!(err, FetchError::BodyDecode(_)));
}
#[test]
fn body_ceiling_saturates_on_overflow() {
// usize::MAX chunk must not wrap the running sum to a small value.
assert!(check_body_ceiling(usize::MAX, usize::MAX).is_err());
}
#[test]
fn test_batch_extract_result_struct() {
let err = BatchExtractResult {
url: "https://example.com".to_string(),
result: Err(FetchError::BodyDecode("timeout".into())),
};
assert_eq!(err.url, "https://example.com");
assert!(err.result.is_err());
}
#[tokio::test]
async fn test_batch_preserves_order() {
let handles: Vec<tokio::task::JoinHandle<(usize, String)>> = vec![
tokio::spawn(async { (2, "c".to_string()) }),
tokio::spawn(async { (0, "a".to_string()) }),
tokio::spawn(async { (1, "b".to_string()) }),
];
let results = collect_ordered(handles, 3).await;
assert_eq!(results, vec!["a", "b", "c"]);
}
#[tokio::test]
async fn test_collect_ordered_handles_gaps() {
let handles: Vec<tokio::task::JoinHandle<(usize, String)>> = vec![
tokio::spawn(async { (0, "first".to_string()) }),
tokio::spawn(async { (2, "third".to_string()) }),
];
let results = collect_ordered(handles, 3).await;
assert_eq!(results.len(), 2);
assert_eq!(results[0], "first");
assert_eq!(results[1], "third");
}
#[test]
fn test_is_pdf_content_type() {
let mut headers = http::HeaderMap::new();
headers.insert("content-type", "application/pdf".parse().unwrap());
assert!(is_pdf_content_type(&headers));
headers.insert(
"content-type",
"application/pdf; charset=utf-8".parse().unwrap(),
);
assert!(is_pdf_content_type(&headers));
headers.insert("content-type", "Application/PDF".parse().unwrap());
assert!(is_pdf_content_type(&headers));
headers.insert("content-type", "text/html".parse().unwrap());
assert!(!is_pdf_content_type(&headers));
let empty = http::HeaderMap::new();
assert!(!is_pdf_content_type(&empty));
}
#[test]
fn test_pdf_to_extraction_result() {
let pdf = webclaw_pdf::PdfResult {
text: "Hello from PDF.".into(),
page_count: 2,
metadata: webclaw_pdf::PdfMetadata {
title: Some("My Doc".into()),
author: Some("Author".into()),
subject: Some("Testing".into()),
creator: None,
},
};
let result = pdf_to_extraction_result(&pdf, "https://example.com/doc.pdf");
assert_eq!(result.metadata.title.as_deref(), Some("My Doc"));
assert_eq!(result.metadata.author.as_deref(), Some("Author"));
assert_eq!(result.metadata.description.as_deref(), Some("Testing"));
assert_eq!(
result.metadata.url.as_deref(),
Some("https://example.com/doc.pdf")
);
assert!(result.content.markdown.contains("# My Doc"));
assert!(result.content.markdown.contains("Hello from PDF."));
assert_eq!(result.content.plain_text, "Hello from PDF.");
assert!(result.content.links.is_empty());
assert!(result.domain_data.is_none());
assert!(result.metadata.word_count > 0);
}
#[test]
fn test_static_pool_no_proxy() {
let config = FetchConfig::default();
let client = FetchClient::new(config).unwrap();
assert_eq!(client.proxy_pool_size(), 0);
}
#[test]
fn test_rotating_pool_prebuilds_clients() {
let config = FetchConfig {
proxy_pool: vec![
"http://proxy1:8080".into(),
"http://proxy2:8080".into(),
"http://proxy3:8080".into(),
],
..Default::default()
};
let client = FetchClient::new(config).unwrap();
assert_eq!(client.proxy_pool_size(), 3);
}
#[test]
fn test_pick_for_host_deterministic() {
let config = FetchConfig {
browser: BrowserProfile::Random,
..Default::default()
};
let client = FetchClient::new(config).unwrap();
let clients = match &client.pool {
ClientPool::Static { clients, .. } => clients,
ClientPool::Rotating { clients } => clients,
};
let a1 = pick_for_host(clients, "example.com") as *const _;
let a2 = pick_for_host(clients, "example.com") as *const _;
let a3 = pick_for_host(clients, "example.com") as *const _;
assert_eq!(a1, a2);
assert_eq!(a2, a3);
}
#[test]
fn test_pick_for_host_distributes() {
let config = FetchConfig {
proxy_pool: (0..10).map(|i| format!("http://proxy{i}:8080")).collect(),
..Default::default()
};
let client = FetchClient::new(config).unwrap();
let clients = match &client.pool {
ClientPool::Static { clients, .. } | ClientPool::Rotating { clients } => clients,
};
let hosts = [
"example.com",
"google.com",
"github.com",
"rust-lang.org",
"crates.io",
];
let indices: Vec<usize> = hosts
.iter()
.map(|h| {
let ptr = pick_for_host(clients, h) as *const _;
clients.iter().position(|c| std::ptr::eq(c, ptr)).unwrap()
})
.collect();
let unique: std::collections::HashSet<_> = indices.iter().collect();
assert!(
unique.len() >= 2,
"expected host distribution across clients, got indices: {indices:?}"
);
}
#[test]
fn test_extract_host() {
assert_eq!(extract_host("https://example.com/path"), "example.com");
assert_eq!(
extract_host("https://sub.example.com:8080/foo"),
"sub.example.com"
);
assert_eq!(extract_host("not-a-url"), "");
}
#[test]
fn test_default_config_has_empty_proxy_pool() {
let config = FetchConfig::default();
assert!(config.proxy_pool.is_empty());
assert!(config.proxy.is_none());
}
}