diff --git a/CHANGELOG.md b/CHANGELOG.md index 951d8d7..3e10de8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,18 @@ All notable changes to webclaw are documented here. Format follows [Keep a Changelog](https://keepachangelog.com/). +## [0.1.3] — 2026-03-25 + +### Added +- Crawl streaming: real-time progress on stderr as pages complete (`[2/50] OK https://... (234ms, 1523 words)`) +- Crawl resume/cancel: `--crawl-state ` saves progress on Ctrl+C and resumes from where it left off +- MCP server proxy support via `WEBCLAW_PROXY` and `WEBCLAW_PROXY_FILE` env vars + +### Changed +- Crawl results now expose visited set and remaining frontier for accurate state persistence + +--- + ## [0.1.2] — 2026-03-25 ### Changed diff --git a/Cargo.lock b/Cargo.lock index 18d1e1b..a363195 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2854,7 +2854,7 @@ dependencies = [ [[package]] name = "webclaw-cli" -version = "0.1.2" +version = "0.1.3" dependencies = [ "clap", "dotenvy", @@ -2874,7 +2874,7 @@ dependencies = [ [[package]] name = "webclaw-core" -version = "0.1.2" +version = "0.1.3" dependencies = [ "ego-tree", "once_cell", @@ -2891,7 +2891,7 @@ dependencies = [ [[package]] name = "webclaw-fetch" -version = "0.1.2" +version = "0.1.3" dependencies = [ "primp", "quick-xml", @@ -2909,7 +2909,7 @@ dependencies = [ [[package]] name = "webclaw-llm" -version = "0.1.2" +version = "0.1.3" dependencies = [ "async-trait", "reqwest", @@ -2922,7 +2922,7 @@ dependencies = [ [[package]] name = "webclaw-mcp" -version = "0.1.2" +version = "0.1.3" dependencies = [ "dotenvy", "reqwest", @@ -2942,7 +2942,7 @@ dependencies = [ [[package]] name = "webclaw-pdf" -version = "0.1.2" +version = "0.1.3" dependencies = [ "pdf-extract", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 8467697..6a8ec19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = ["crates/*"] [workspace.package] -version = "0.1.2" +version = "0.1.3" edition = "2024" license = "MIT" repository = "https://github.com/0xMassi/webclaw" diff --git a/crates/webclaw-cli/src/main.rs b/crates/webclaw-cli/src/main.rs index a4d7da4..475e58b 100644 --- a/crates/webclaw-cli/src/main.rs +++ b/crates/webclaw-cli/src/main.rs @@ -4,8 +4,10 @@ mod cloud; use std::io::{self, Read as _}; +use std::path::PathBuf; use std::process; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use clap::{Parser, ValueEnum}; use tracing_subscriber::EnvFilter; @@ -14,7 +16,7 @@ use webclaw_core::{ }; use webclaw_fetch::{ BatchExtractResult, BrowserProfile, CrawlConfig, CrawlResult, Crawler, FetchClient, - FetchConfig, FetchResult, SitemapEntry, + FetchConfig, FetchResult, PageResult, SitemapEntry, }; use webclaw_llm::LlmProvider; use webclaw_pdf::PdfMode; @@ -198,6 +200,10 @@ struct Cli { #[arg(long)] exclude_paths: Option, + /// Path to save/resume crawl state. On Ctrl+C: saves progress. On start: resumes if file exists. + #[arg(long)] + crawl_state: Option, + /// Seed crawl frontier from sitemap discovery (robots.txt + /sitemap.xml) #[arg(long)] sitemap: bool, @@ -903,6 +909,23 @@ fn print_map_output(entries: &[SitemapEntry], format: &OutputFormat) { } } +/// Format a streaming progress line for a completed page. +fn format_progress(page: &PageResult, index: usize, max_pages: usize) -> String { + let status = if page.error.is_some() { "ERR" } else { "OK " }; + let timing = format!("{}ms", page.elapsed.as_millis()); + let detail = if let Some(ref extraction) = page.extraction { + format!(", {} words", extraction.metadata.word_count) + } else if let Some(ref err) = page.error { + format!(" ({err})") + } else { + String::new() + }; + format!( + "[{index}/{max_pages}] {status} {} ({timing}{detail})", + page.url + ) +} + async fn run_crawl(cli: &Cli) -> Result<(), String> { let url = cli .urls @@ -926,6 +949,23 @@ async fn run_crawl(cli: &Cli) -> Result<(), String> { .map(|s| s.split(',').map(|p| p.trim().to_string()).collect()) .unwrap_or_default(); + // Set up streaming progress channel + let (progress_tx, mut progress_rx) = tokio::sync::broadcast::channel::(100); + + // Set up cancel flag for Ctrl+C handling + let cancel_flag = Arc::new(AtomicBool::new(false)); + + // Register Ctrl+C handler when --crawl-state is set + let state_path = cli.crawl_state.clone(); + if state_path.is_some() { + let flag = Arc::clone(&cancel_flag); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + flag.store(true, Ordering::Relaxed); + eprintln!("\nCtrl+C received, saving crawl state..."); + }); + } + let config = CrawlConfig { fetch: build_fetch_config(cli), max_depth: cli.depth, @@ -936,11 +976,67 @@ async fn run_crawl(cli: &Cli) -> Result<(), String> { use_sitemap: cli.sitemap, include_patterns, exclude_patterns, - progress_tx: None, + progress_tx: Some(progress_tx), + cancel_flag: Some(Arc::clone(&cancel_flag)), }; + // Load resume state if --crawl-state file exists + let resume_state = state_path + .as_ref() + .and_then(|p| Crawler::load_state(p)) + .inspect(|s| { + eprintln!( + "Resuming crawl: {} pages already visited, {} URLs in frontier", + s.visited.len(), + s.frontier.len(), + ); + }); + + let max_pages = cli.max_pages; + let completed_offset = resume_state.as_ref().map_or(0, |s| s.completed_pages); + + // Spawn background task to print streaming progress to stderr + let progress_handle = tokio::spawn(async move { + let mut count = completed_offset; + while let Ok(page) = progress_rx.recv().await { + count += 1; + eprintln!("{}", format_progress(&page, count, max_pages)); + } + }); + let crawler = Crawler::new(url, config).map_err(|e| format!("crawler error: {e}"))?; - let result = crawler.crawl(url).await; + let result = crawler.crawl(url, resume_state).await; + + // Drop the crawler (and its progress_tx clone) so the progress task finishes + drop(crawler); + let _ = progress_handle.await; + + // If cancelled via Ctrl+C and --crawl-state is set, save state for resume + let was_cancelled = cancel_flag.load(Ordering::Relaxed); + if was_cancelled { + if let Some(ref path) = state_path { + Crawler::save_state( + path, + url, + &result.visited, + &result.remaining_frontier, + completed_offset + result.pages.len(), + cli.max_pages, + cli.depth, + )?; + eprintln!( + "Crawl state saved to {} ({} pages completed). Resume with --crawl-state {}", + path.display(), + completed_offset + result.pages.len(), + path.display(), + ); + } + } else if let Some(ref path) = state_path { + // Crawl completed normally — clean up state file + if path.exists() { + let _ = std::fs::remove_file(path); + } + } // Log per-page errors and extraction warnings to stderr for page in &result.pages { diff --git a/crates/webclaw-fetch/src/crawler.rs b/crates/webclaw-fetch/src/crawler.rs index 3e38ae5..3ef3f86 100644 --- a/crates/webclaw-fetch/src/crawler.rs +++ b/crates/webclaw-fetch/src/crawler.rs @@ -7,7 +7,9 @@ /// When `use_sitemap` is enabled, the crawler first discovers URLs from the /// site's sitemaps and seeds the BFS frontier before crawling. use std::collections::HashSet; +use std::path::Path; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; @@ -45,6 +47,9 @@ pub struct CrawlConfig { /// Optional channel sender for streaming per-page results as they complete. /// When set, each `PageResult` is sent on this channel immediately after extraction. pub progress_tx: Option>, + /// When set to `true`, the crawler breaks out of the main loop early. + /// Callers (e.g. a Ctrl+C handler) can flip this to request graceful cancellation. + pub cancel_flag: Option>, } impl Default for CrawlConfig { @@ -60,6 +65,7 @@ impl Default for CrawlConfig { include_patterns: Vec::new(), exclude_patterns: Vec::new(), progress_tx: None, + cancel_flag: None, } } } @@ -72,6 +78,12 @@ pub struct CrawlResult { pub ok: usize, pub errors: usize, pub elapsed_secs: f64, + /// URLs visited during this crawl (for resume state). + #[serde(skip)] + pub visited: HashSet, + /// Remaining frontier when crawl was cancelled (for resume state). + #[serde(skip)] + pub remaining_frontier: Vec<(String, usize)>, } /// Outcome of extracting a single page during the crawl. @@ -85,6 +97,17 @@ pub struct PageResult { pub elapsed: Duration, } +/// Serializable crawl state for resume after Ctrl+C cancellation. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CrawlState { + pub seed_url: String, + pub visited: Vec, + pub frontier: Vec<(String, usize)>, + pub completed_pages: usize, + pub max_pages: usize, + pub max_depth: usize, +} + /// Recursive crawler that wraps a shared [`FetchClient`]. pub struct Crawler { client: Arc, @@ -108,6 +131,43 @@ impl Crawler { }) } + /// Save current crawl state to a JSON file for later resume. + pub fn save_state( + path: &Path, + seed_url: &str, + visited: &HashSet, + frontier: &[(String, usize)], + completed_pages: usize, + max_pages: usize, + max_depth: usize, + ) -> Result<(), String> { + let state = CrawlState { + seed_url: seed_url.to_string(), + visited: visited.iter().cloned().collect(), + frontier: frontier.to_vec(), + completed_pages, + max_pages, + max_depth, + }; + let json = + serde_json::to_string_pretty(&state).map_err(|e| format!("serialize state: {e}"))?; + std::fs::write(path, json).map_err(|e| format!("write state to {}: {e}", path.display())) + } + + /// Load crawl state from a JSON file. Returns `None` if file doesn't exist. + pub fn load_state(path: &Path) -> Option { + let content = std::fs::read_to_string(path).ok()?; + serde_json::from_str(&content).ok() + } + + /// Returns true if the cancel flag has been set. + fn is_cancelled(&self) -> bool { + self.config + .cancel_flag + .as_ref() + .is_some_and(|f| f.load(Ordering::Relaxed)) + } + /// Crawl starting from `start_url`, returning results for every page visited. /// /// Uses breadth-first traversal: all pages at depth N are fetched (concurrently, @@ -115,7 +175,10 @@ impl Crawler { /// /// When `config.use_sitemap` is true, sitemap URLs are discovered first and /// added to the initial frontier at depth 0 alongside the seed URL. - pub async fn crawl(&self, start_url: &str) -> CrawlResult { + /// + /// If `resume_state` is provided, the crawl resumes from the saved state + /// (pre-populated visited set and frontier) instead of starting fresh. + pub async fn crawl(&self, start_url: &str, resume_state: Option) -> CrawlResult { let start = Instant::now(); let seed = match Url::parse(start_url) { @@ -133,46 +196,66 @@ impl Crawler { ok: 0, errors: 1, elapsed_secs: 0.0, + visited: HashSet::new(), + remaining_frontier: Vec::new(), }; } }; let semaphore = Arc::new(Semaphore::new(self.config.concurrency)); - let mut visited: HashSet = HashSet::new(); + let mut visited: HashSet; let mut pages: Vec = Vec::new(); + let mut frontier: Vec<(String, usize)>; - // BFS frontier: vec of (normalized_url, depth) for the current level - let mut frontier: Vec<(String, usize)> = vec![(normalize(&seed), 0)]; + // Resume from saved state or start fresh + if let Some(state) = resume_state { + visited = state.visited.into_iter().collect(); + frontier = state.frontier; + info!( + visited = visited.len(), + frontier = frontier.len(), + "resuming crawl from saved state" + ); + } else { + visited = HashSet::new(); + frontier = vec![(normalize(&seed), 0)]; - // Seed frontier from sitemap if enabled - if self.config.use_sitemap { - let base_url = format!("{}://{}", seed.scheme(), seed.host_str().unwrap_or("")); - match sitemap::discover(&self.client, &base_url).await { - Ok(entries) => { - let before = frontier.len(); - for entry in entries { - if self.qualify_link(&entry.url, &visited).is_some() { - let parsed = match Url::parse(&entry.url) { - Ok(u) => u, - Err(_) => continue, - }; - let norm = normalize(&parsed); - frontier.push((norm, 0)); + // Seed frontier from sitemap if enabled + if self.config.use_sitemap { + let base_url = format!("{}://{}", seed.scheme(), seed.host_str().unwrap_or("")); + match sitemap::discover(&self.client, &base_url).await { + Ok(entries) => { + let before = frontier.len(); + for entry in entries { + if self.qualify_link(&entry.url, &visited).is_some() { + let parsed = match Url::parse(&entry.url) { + Ok(u) => u, + Err(_) => continue, + }; + let norm = normalize(&parsed); + frontier.push((norm, 0)); + } } + let added = frontier.len() - before; + info!( + sitemap_urls = added, + "seeded frontier from sitemap discovery" + ); + } + Err(e) => { + warn!(error = %e, "sitemap discovery failed, continuing with seed URL only"); } - let added = frontier.len() - before; - info!( - sitemap_urls = added, - "seeded frontier from sitemap discovery" - ); - } - Err(e) => { - warn!(error = %e, "sitemap discovery failed, continuing with seed URL only"); } } } while !frontier.is_empty() && pages.len() < self.config.max_pages { + // Check cancel flag before processing each batch + if self.is_cancelled() { + info!("crawl cancelled by user"); + break; + } + // Dedup this level's frontier against the visited set and page cap let batch: Vec<(String, usize)> = frontier .drain(..) @@ -265,6 +348,12 @@ impl Crawler { if pages.len() >= self.config.max_pages { break; } + + // Check cancel flag between page results + if self.is_cancelled() { + info!("crawl cancelled by user (mid-batch)"); + break; + } } frontier = next_frontier; @@ -286,6 +375,8 @@ impl Crawler { ok: ok_count, errors: err_count, elapsed_secs: total_elapsed.as_secs_f64(), + remaining_frontier: frontier, + visited, pages, } } diff --git a/crates/webclaw-fetch/src/lib.rs b/crates/webclaw-fetch/src/lib.rs index f8a472d..c5cd40b 100644 --- a/crates/webclaw-fetch/src/lib.rs +++ b/crates/webclaw-fetch/src/lib.rs @@ -13,7 +13,7 @@ pub mod sitemap; pub use browser::BrowserProfile; pub use client::{BatchExtractResult, BatchResult, FetchClient, FetchConfig, FetchResult}; -pub use crawler::{CrawlConfig, CrawlResult, Crawler, PageResult}; +pub use crawler::{CrawlConfig, CrawlResult, CrawlState, Crawler, PageResult}; pub use error::FetchError; pub use proxy::{parse_proxy_file, parse_proxy_line}; pub use sitemap::SitemapEntry; diff --git a/crates/webclaw-mcp/src/server.rs b/crates/webclaw-mcp/src/server.rs index 41dd0a9..1290f44 100644 --- a/crates/webclaw-mcp/src/server.rs +++ b/crates/webclaw-mcp/src/server.rs @@ -62,12 +62,20 @@ impl WebclawMcp { pub async fn new() -> Self { let mut config = webclaw_fetch::FetchConfig::default(); - // Auto-load proxies.txt if present - if std::path::Path::new("proxies.txt").exists() - && let Ok(pool) = webclaw_fetch::parse_proxy_file("proxies.txt") + // Load proxy config from env vars or local file + if let Ok(proxy) = std::env::var("WEBCLAW_PROXY") { + info!("using single proxy from WEBCLAW_PROXY"); + config.proxy = Some(proxy); + } + + let proxy_file = std::env::var("WEBCLAW_PROXY_FILE") + .ok() + .unwrap_or_else(|| "proxies.txt".to_string()); + if std::path::Path::new(&proxy_file).exists() + && let Ok(pool) = webclaw_fetch::parse_proxy_file(&proxy_file) && !pool.is_empty() { - info!(count = pool.len(), "loaded proxy pool from proxies.txt"); + info!(count = pool.len(), file = %proxy_file, "loaded proxy pool"); config.proxy_pool = pool; } @@ -210,7 +218,7 @@ impl WebclawMcp { let crawler = webclaw_fetch::Crawler::new(¶ms.url, config) .map_err(|e| format!("Crawler init failed: {e}"))?; - let result = crawler.crawl(¶ms.url).await; + let result = crawler.crawl(¶ms.url, None).await; let mut output = format!( "Crawled {} pages ({} ok, {} errors) in {:.1}s\n\n",