feat: v0.2.0 — DOCX/XLSX/CSV extraction, HTML format, multi-URL watch, batch LLM

Document extraction:
- DOCX: auto-detected, outputs markdown with headings (via zip + quick-xml)
- XLSX/XLS: markdown tables with multi-sheet support (via calamine)
- CSV: quoted field handling, markdown table output
- All auto-detected by Content-Type header or URL extension

New features:
- -f html output format (sanitized HTML)
- Multi-URL watch: --urls-file + --watch monitors all URLs in parallel
- Batch + LLM: --extract-prompt/--extract-json works with multiple URLs
- Mixed batch: HTML pages + DOCX + XLSX + CSV in one command

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Valerio 2026-03-26 15:28:23 +01:00
parent 0e4128782a
commit ea14848772
8 changed files with 1520 additions and 41 deletions

View file

@ -95,7 +95,7 @@ struct Cli {
#[arg(long)]
urls_file: Option<String>,
/// Output format (markdown, json, text, llm)
/// Output format (markdown, json, text, llm, html)
#[arg(short, long, default_value = "markdown")]
format: OutputFormat,
@ -277,6 +277,7 @@ enum OutputFormat {
Json,
Text,
Llm,
Html,
}
#[derive(Clone, ValueEnum)]
@ -394,7 +395,7 @@ fn build_extraction_options(cli: &Cli) -> ExtractionOptions {
.map(|s| s.split(',').map(|s| s.trim().to_string()).collect())
.unwrap_or_default(),
only_main_content: cli.only_main_content,
include_raw_html: cli.raw_html,
include_raw_html: cli.raw_html || matches!(cli.format, OutputFormat::Html),
}
}
@ -417,6 +418,7 @@ fn url_to_filename(raw_url: &str, format: &OutputFormat) -> String {
OutputFormat::Markdown | OutputFormat::Llm => "md",
OutputFormat::Json => "json",
OutputFormat::Text => "txt",
OutputFormat::Html => "html",
};
let parsed = url::Url::parse(raw_url);
@ -470,6 +472,15 @@ fn write_to_file(dir: &Path, filename: &str, content: &str) -> Result<(), String
Ok(())
}
/// Get raw HTML from an extraction result, falling back to markdown if unavailable.
fn raw_html_or_markdown(result: &ExtractionResult) -> &str {
result
.content
.raw_html
.as_deref()
.unwrap_or(&result.content.markdown)
}
/// Format an `ExtractionResult` into a string for the given output format.
fn format_output(result: &ExtractionResult, format: &OutputFormat, show_metadata: bool) -> String {
match format {
@ -484,6 +495,7 @@ fn format_output(result: &ExtractionResult, format: &OutputFormat, show_metadata
OutputFormat::Json => serde_json::to_string_pretty(result).expect("serialization failed"),
OutputFormat::Text => result.content.plain_text.clone(),
OutputFormat::Llm => to_llm_text(result, result.metadata.url.as_deref()),
OutputFormat::Html => raw_html_or_markdown(result).to_string(),
}
}
@ -586,6 +598,7 @@ async fn fetch_and_extract(cli: &Cli) -> Result<FetchOutput, String> {
OutputFormat::Json => "json",
OutputFormat::Text => "text",
OutputFormat::Llm => "llm",
OutputFormat::Html => "html",
};
let resp = c
.scrape(
@ -618,6 +631,7 @@ async fn fetch_and_extract(cli: &Cli) -> Result<FetchOutput, String> {
OutputFormat::Json => "json",
OutputFormat::Text => "text",
OutputFormat::Llm => "llm",
OutputFormat::Html => "html",
};
match c
.scrape(
@ -793,6 +807,9 @@ fn print_output(result: &ExtractionResult, format: &OutputFormat, show_metadata:
OutputFormat::Llm => {
println!("{}", to_llm_text(result, result.metadata.url.as_deref()));
}
OutputFormat::Html => {
println!("{}", raw_html_or_markdown(result));
}
}
}
@ -845,6 +862,17 @@ fn print_cloud_output(resp: &serde_json::Value, format: &OutputFormat) {
print_cloud_output(resp, &OutputFormat::Markdown);
}
}
OutputFormat::Html => {
if let Some(html) = resp
.get("content")
.and_then(|c| c.get("raw_html"))
.and_then(|h| h.as_str())
{
println!("{html}");
} else {
print_cloud_output(resp, &OutputFormat::Markdown);
}
}
}
}
@ -937,6 +965,17 @@ fn print_crawl_output(result: &CrawlResult, format: &OutputFormat, show_metadata
println!();
}
}
OutputFormat::Html => {
for page in &result.pages {
let Some(ref extraction) = page.extraction else {
continue;
};
println!("---");
println!("<!-- Page: {} -->\n", page.url);
println!("{}", raw_html_or_markdown(extraction));
println!();
}
}
}
}
@ -1009,6 +1048,21 @@ fn print_batch_output(results: &[BatchExtractResult], format: &OutputFormat, sho
}
}
}
OutputFormat::Html => {
for r in results {
match &r.result {
Ok(extraction) => {
println!("---");
println!("<!-- {} -->\n", r.url);
println!("{}", raw_html_or_markdown(extraction));
println!();
}
Err(e) => {
eprintln!("error: {} -- {}", r.url, e);
}
}
}
}
}
}
@ -1393,24 +1447,15 @@ fn fire_webhook(url: &str, payload: &serde_json::Value) {
});
}
async fn run_watch(cli: &Cli) -> Result<(), String> {
let raw_url = cli.urls.first().ok_or("--watch requires a URL argument")?;
let url = normalize_url(raw_url);
async fn run_watch(cli: &Cli, urls: &[String]) -> Result<(), String> {
if urls.is_empty() {
return Err("--watch requires at least one URL".into());
}
let client =
FetchClient::new(build_fetch_config(cli)).map_err(|e| format!("client error: {e}"))?;
let options = build_extraction_options(cli);
// Initial snapshot
let mut previous = client
.fetch_and_extract_with_options(&url, &options)
.await
.map_err(|e| format!("initial fetch failed: {e}"))?;
eprintln!(
"[watch] Initial snapshot: {url} ({} words)",
previous.metadata.word_count
let client = Arc::new(
FetchClient::new(build_fetch_config(cli)).map_err(|e| format!("client error: {e}"))?,
);
let options = build_extraction_options(cli);
// Ctrl+C handler
let cancelled = Arc::new(AtomicBool::new(false));
@ -1420,6 +1465,33 @@ async fn run_watch(cli: &Cli) -> Result<(), String> {
flag.store(true, Ordering::Relaxed);
});
// Single-URL mode: preserve original behavior exactly
if urls.len() == 1 {
return run_watch_single(cli, &client, &options, &urls[0], &cancelled).await;
}
// Multi-URL mode: batch fetch, diff each, report aggregate
run_watch_multi(cli, &client, &options, urls, &cancelled).await
}
/// Original single-URL watch loop -- backward compatible.
async fn run_watch_single(
cli: &Cli,
client: &Arc<FetchClient>,
options: &ExtractionOptions,
url: &str,
cancelled: &Arc<AtomicBool>,
) -> Result<(), String> {
let mut previous = client
.fetch_and_extract_with_options(url, options)
.await
.map_err(|e| format!("initial fetch failed: {e}"))?;
eprintln!(
"[watch] Initial snapshot: {url} ({} words)",
previous.metadata.word_count
);
loop {
tokio::time::sleep(std::time::Duration::from_secs(cli.watch_interval)).await;
@ -1428,7 +1500,7 @@ async fn run_watch(cli: &Cli) -> Result<(), String> {
break;
}
let current = match client.fetch_and_extract_with_options(&url, &options).await {
let current = match client.fetch_and_extract_with_options(url, options).await {
Ok(result) => result,
Err(e) => {
eprintln!("[watch] Fetch error ({}): {e}", timestamp());
@ -1454,7 +1526,6 @@ async fn run_watch(cli: &Cli) -> Result<(), String> {
.spawn()
{
Ok(mut child) => {
// Pipe diff JSON to stdin, then detach
if let Some(mut stdin) = child.stdin.take() {
use tokio::io::AsyncWriteExt;
let _ = stdin.write_all(diff_json.as_bytes()).await;
@ -1464,7 +1535,6 @@ async fn run_watch(cli: &Cli) -> Result<(), String> {
}
}
// Fire webhook on change
if let Some(ref webhook_url) = cli.webhook {
fire_webhook(
webhook_url,
@ -1487,6 +1557,162 @@ async fn run_watch(cli: &Cli) -> Result<(), String> {
Ok(())
}
/// Multi-URL watch loop -- batch fetch all URLs, diff each, report aggregate.
async fn run_watch_multi(
cli: &Cli,
client: &Arc<FetchClient>,
options: &ExtractionOptions,
urls: &[String],
cancelled: &Arc<AtomicBool>,
) -> Result<(), String> {
let url_refs: Vec<&str> = urls.iter().map(|u| u.as_str()).collect();
// Initial pass: fetch all URLs in parallel
let initial_results = client
.fetch_and_extract_batch_with_options(&url_refs, cli.concurrency, options)
.await;
let mut snapshots = std::collections::HashMap::new();
let mut ok_count = 0usize;
let mut err_count = 0usize;
for r in initial_results {
match r.result {
Ok(extraction) => {
snapshots.insert(r.url, extraction);
ok_count += 1;
}
Err(e) => {
eprintln!("[watch] Initial fetch error: {} -- {e}", r.url);
err_count += 1;
}
}
}
eprintln!(
"[watch] Watching {} URLs (interval: {}s)",
urls.len(),
cli.watch_interval
);
eprintln!("[watch] Initial snapshots: {ok_count} ok, {err_count} errors");
let mut check_number = 0u64;
loop {
tokio::time::sleep(std::time::Duration::from_secs(cli.watch_interval)).await;
if cancelled.load(Ordering::Relaxed) {
eprintln!("[watch] Stopped");
break;
}
check_number += 1;
let current_results = client
.fetch_and_extract_batch_with_options(&url_refs, cli.concurrency, options)
.await;
let mut changed: Vec<serde_json::Value> = Vec::new();
let mut same_count = 0usize;
let mut fetch_errors = 0usize;
for r in current_results {
match r.result {
Ok(current) => {
if let Some(previous) = snapshots.get(&r.url) {
let diff = webclaw_core::diff::diff(previous, &current);
if diff.status == ChangeStatus::Same {
same_count += 1;
} else {
changed.push(serde_json::json!({
"url": r.url,
"word_count_delta": diff.word_count_delta,
}));
snapshots.insert(r.url, current);
}
} else {
// URL failed initially, first successful fetch -- store as baseline
snapshots.insert(r.url, current);
same_count += 1;
}
}
Err(e) => {
eprintln!("[watch] Fetch error: {} -- {e}", r.url);
fetch_errors += 1;
}
}
}
let ts = timestamp();
let err_suffix = if fetch_errors > 0 {
format!(", {fetch_errors} errors")
} else {
String::new()
};
if changed.is_empty() {
eprintln!(
"[watch] Check {check_number} ({ts}): 0 changed, {same_count} same{err_suffix}"
);
} else {
eprintln!(
"[watch] Check {check_number} ({ts}): {} changed, {same_count} same{err_suffix}",
changed.len(),
);
for entry in &changed {
let url = entry["url"].as_str().unwrap_or("?");
let delta = entry["word_count_delta"].as_i64().unwrap_or(0);
eprintln!(" -> {url} (word delta: {delta:+})");
}
// Fire --on-change once with all changes
if let Some(ref cmd) = cli.on_change {
let payload = serde_json::json!({
"event": "watch_changes",
"check_number": check_number,
"total_urls": urls.len(),
"changed": changed.len(),
"same": same_count,
"changes": changed,
});
let payload_json = serde_json::to_string(&payload).unwrap_or_default();
eprintln!("[watch] Running: {cmd}");
match tokio::process::Command::new("sh")
.arg("-c")
.arg(cmd)
.stdin(std::process::Stdio::piped())
.spawn()
{
Ok(mut child) => {
if let Some(mut stdin) = child.stdin.take() {
use tokio::io::AsyncWriteExt;
let _ = stdin.write_all(payload_json.as_bytes()).await;
}
}
Err(e) => eprintln!("[watch] Failed to run command: {e}"),
}
}
// Fire webhook once with aggregate payload
if let Some(ref webhook_url) = cli.webhook {
fire_webhook(
webhook_url,
&serde_json::json!({
"event": "watch_changes",
"check_number": check_number,
"total_urls": urls.len(),
"changed": changed.len(),
"same": same_count,
"changes": changed,
}),
);
}
}
}
Ok(())
}
async fn run_diff(cli: &Cli, snapshot_path: &str) -> Result<(), String> {
// Load previous snapshot
let snapshot_json = std::fs::read_to_string(snapshot_path)
@ -1626,6 +1852,158 @@ async fn run_llm(cli: &Cli) -> Result<(), String> {
Ok(())
}
/// Batch LLM extraction: fetch each URL, run LLM on extracted content, save/print results.
/// URLs are processed sequentially to respect LLM provider rate limits.
async fn run_batch_llm(cli: &Cli, entries: &[(String, Option<String>)]) -> Result<(), String> {
let client =
FetchClient::new(build_fetch_config(cli)).map_err(|e| format!("client error: {e}"))?;
let options = build_extraction_options(cli);
let provider = build_llm_provider(cli).await?;
let model = cli.llm_model.as_deref();
// Pre-parse schema once if --extract-json is used
let schema = if let Some(ref schema_input) = cli.extract_json {
let schema_str = if let Some(path) = schema_input.strip_prefix('@') {
std::fs::read_to_string(path)
.map_err(|e| format!("failed to read schema file {path}: {e}"))?
} else {
schema_input.clone()
};
Some(
serde_json::from_str::<serde_json::Value>(&schema_str)
.map_err(|e| format!("invalid JSON schema: {e}"))?,
)
} else {
None
};
// Build custom filename lookup from entries
let custom_names: std::collections::HashMap<&str, &str> = entries
.iter()
.filter_map(|(url, name)| name.as_deref().map(|n| (url.as_str(), n)))
.collect();
let total = entries.len();
let mut ok = 0usize;
let mut errors = 0usize;
let mut all_results: Vec<serde_json::Value> = Vec::with_capacity(total);
for (i, (url, _)) in entries.iter().enumerate() {
let idx = i + 1;
eprint!("[{idx}/{total}] {url} ");
// Fetch and extract page content
let extraction = match client.fetch_and_extract_with_options(url, &options).await {
Ok(r) => r,
Err(e) => {
errors += 1;
let msg = format!("fetch failed: {e}");
eprintln!("-> error: {msg}");
all_results.push(serde_json::json!({ "url": url, "error": msg }));
continue;
}
};
let text = &extraction.content.plain_text;
// Run the appropriate LLM operation
let llm_result = if let Some(ref schema) = schema {
webclaw_llm::extract::extract_json(text, schema, provider.as_ref(), model)
.await
.map(LlmOutput::Json)
} else if let Some(ref prompt) = cli.extract_prompt {
webclaw_llm::extract::extract_with_prompt(text, prompt, provider.as_ref(), model)
.await
.map(LlmOutput::Json)
} else if let Some(sentences) = cli.summarize {
webclaw_llm::summarize::summarize(text, Some(sentences), provider.as_ref(), model)
.await
.map(LlmOutput::Text)
} else {
unreachable!("run_batch_llm called without LLM flags")
};
match llm_result {
Ok(output) => {
ok += 1;
let (output_str, result_json) = match &output {
LlmOutput::Json(v) => {
let s = serde_json::to_string_pretty(v).expect("serialization failed");
let j = serde_json::json!({ "url": url, "result": v });
(s, j)
}
LlmOutput::Text(s) => {
let j = serde_json::json!({ "url": url, "result": s });
(s.clone(), j)
}
};
// Count top-level fields/items for progress display
let detail = match &output {
LlmOutput::Json(v) => match v {
serde_json::Value::Object(m) => format!("{} fields", m.len()),
serde_json::Value::Array(a) => format!("{} items", a.len()),
_ => "done".to_string(),
},
LlmOutput::Text(s) => {
let words = s.split_whitespace().count();
format!("{words} words")
}
};
eprintln!("-> extracted {detail}");
if let Some(ref dir) = cli.output_dir {
let filename = custom_names
.get(url.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| url_to_filename(url, &OutputFormat::Json));
write_to_file(dir, &filename, &output_str)?;
} else {
println!("--- {url}");
println!("{output_str}");
println!();
}
all_results.push(result_json);
}
Err(e) => {
errors += 1;
let msg = format!("LLM extraction failed: {e}");
eprintln!("-> error: {msg}");
all_results.push(serde_json::json!({ "url": url, "error": msg }));
}
}
}
eprintln!("Processed {total} URLs ({ok} ok, {errors} errors)");
if let Some(ref webhook_url) = cli.webhook {
fire_webhook(
webhook_url,
&serde_json::json!({
"event": "batch_llm_complete",
"total": total,
"ok": ok,
"errors": errors,
}),
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
if errors > 0 {
Err(format!("{errors} of {total} URLs failed"))
} else {
Ok(())
}
}
/// Intermediate type to hold LLM output before formatting.
enum LlmOutput {
Json(serde_json::Value),
Text(String),
}
/// Returns true if any LLM flag is set.
fn has_llm_flags(cli: &Cli) -> bool {
cli.extract_json.is_some() || cli.extract_prompt.is_some() || cli.summarize.is_some()
@ -1656,9 +2034,16 @@ async fn main() {
return;
}
// --watch: poll a URL for changes
// --watch: poll URL(s) for changes
if cli.watch {
if let Err(e) = run_watch(&cli).await {
let watch_urls: Vec<String> = match collect_urls(&cli) {
Ok(entries) => entries.into_iter().map(|(url, _)| url).collect(),
Err(e) => {
eprintln!("error: {e}");
process::exit(1);
}
};
if let Err(e) = run_watch(&cli, &watch_urls).await {
eprintln!("error: {e}");
process::exit(1);
}
@ -1683,15 +2068,6 @@ async fn main() {
return;
}
// LLM modes: --extract-json, --extract-prompt, --summarize
if has_llm_flags(&cli) {
if let Err(e) = run_llm(&cli).await {
eprintln!("error: {e}");
process::exit(1);
}
return;
}
// Collect all URLs from args + --urls-file
let entries = match collect_urls(&cli) {
Ok(u) => u,
@ -1701,6 +2077,21 @@ async fn main() {
}
};
// LLM modes: --extract-json, --extract-prompt, --summarize
// When multiple URLs are provided, run batch LLM extraction over all of them.
if has_llm_flags(&cli) {
if entries.len() > 1 {
if let Err(e) = run_batch_llm(&cli, &entries).await {
eprintln!("error: {e}");
process::exit(1);
}
} else if let Err(e) = run_llm(&cli).await {
eprintln!("error: {e}");
process::exit(1);
}
return;
}
// Multi-URL batch mode
if entries.len() > 1 {
if let Err(e) = run_batch(&cli, &entries).await {
@ -1824,6 +2215,14 @@ mod tests {
);
}
#[test]
fn url_to_filename_html_format() {
assert_eq!(
url_to_filename("https://example.com/docs/api", &OutputFormat::Html),
"docs/api.html"
);
}
#[test]
fn url_to_filename_special_chars() {
// Spaces and special chars get replaced with underscores