mirror of
https://github.com/elicpeter/nyx.git
synced 2026-06-21 20:18:06 +02:00
Add error handling with NyxError and refactor console output formatting
- Introduced `NyxError` and `NyxResult` for unified error handling across modules. - Refactored `scan.rs`, `index.rs`, and `walk.rs` with improved error management and consistent formatting. - Replaced existing error handling in `database.rs` with `NyxResult`. - Improved database maintenance by integrating `vacuum` and `clear` methods into workflows. - Added `dashmap` for efficient parallel diagnostics result aggregation in `scan_with_index_parallel`. - Enhanced readability and formatting of console outputs in multiple modules.
This commit is contained in:
parent
75a20eaa2a
commit
0a66a0ae2d
14 changed files with 360 additions and 240 deletions
128
src/walk.rs
128
src/walk.rs
|
|
@ -1,106 +1,104 @@
|
|||
use crossbeam_channel::{bounded, Receiver};
|
||||
use ignore::{WalkBuilder, WalkState};
|
||||
use std::{path::{Path, PathBuf}, thread};
|
||||
use ignore::overrides::OverrideBuilder;
|
||||
use crossbeam_channel::{bounded, Receiver, Sender};
|
||||
use ignore::{overrides::OverrideBuilder, WalkBuilder, WalkState};
|
||||
use std::{
|
||||
mem,
|
||||
path::{Path, PathBuf},
|
||||
thread,
|
||||
};
|
||||
|
||||
use crate::utils::Config;
|
||||
|
||||
const BATCH_SIZE: usize = 5;
|
||||
// ---------------------------------------------------------------------------
|
||||
// Internal constants / helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
const DEFAULT_BATCH: usize = 8; // a tad larger for fewer sends
|
||||
const CHANNEL_MULTIPLIER:usize = 4; // capacity = threads × this
|
||||
|
||||
type Batch = Vec<PathBuf>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Batcher {
|
||||
tx: crossbeam_channel::Sender<Batch>,
|
||||
tx: Sender<Batch>,
|
||||
batch: Batch,
|
||||
}
|
||||
|
||||
impl Batcher {
|
||||
fn push(&mut self, p: PathBuf) {
|
||||
self.batch.push(p);
|
||||
if self.batch.len() == BATCH_SIZE {
|
||||
if self.batch.len() == DEFAULT_BATCH {
|
||||
self.flush();
|
||||
}
|
||||
}
|
||||
fn flush(&mut self) {
|
||||
if !self.batch.is_empty() {
|
||||
let _ = self.tx.send(std::mem::take(&mut self.batch));
|
||||
let _ = self.tx.send(mem::take(&mut self.batch));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Batcher {
|
||||
fn drop(&mut self) {
|
||||
// guarantees the remainder is sent when the worker is dropped
|
||||
self.flush();
|
||||
}
|
||||
fn drop(&mut self) { self.flush(); }
|
||||
}
|
||||
|
||||
|
||||
/// Walk `root`, send file paths to the returned receiver.
|
||||
pub fn spawn_senders(
|
||||
root: &Path,
|
||||
cfg: &Config
|
||||
) -> Receiver<Batch> {
|
||||
// ---------------------------------------------------------------------------
|
||||
/// Walk `root` and send *batches* of paths through the returned channel.
|
||||
pub fn spawn_senders(root: &Path, cfg: &Config) -> Receiver<Batch> {
|
||||
// ----- 1 build ignore/override rules ----------------------------------
|
||||
let mut ob = OverrideBuilder::new(root);
|
||||
|
||||
for ext in &cfg.scanner.excluded_extensions {
|
||||
if let Err(e) = ob.add(&format!("!*.{ext}")) {
|
||||
tracing::warn!("could not add ignore pattern: {e}");
|
||||
tracing::warn!("cannot add ignore pattern ‘{ext}’: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
for dir in &cfg.scanner.excluded_directories {
|
||||
if let Err(e) = ob.add(&format!("!**/{dir}/**")) {
|
||||
tracing::warn!("could not add ignore pattern: {e}");
|
||||
tracing::warn!("cannot add ignore pattern ‘{dir}’: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
let overrides = ob.build().unwrap();
|
||||
let worker_thrs = cfg.performance.worker_threads.unwrap_or(num_cpus::get());
|
||||
|
||||
let (tx, rx) = bounded::<Batch>(worker_thrs * 2usize);
|
||||
|
||||
let root = root.to_path_buf();
|
||||
let scan_hidden = cfg.scanner.scan_hidden_files;
|
||||
let follow_links = cfg.scanner.follow_symlinks;
|
||||
let max_bytes: u64 = (cfg.scanner.max_file_size_mb.unwrap_or(0)) * 1_048_576;
|
||||
let overrides = ob.build().unwrap();
|
||||
|
||||
// ----- 2 channel & thread pool parameters -----------------------------
|
||||
let workers = cfg.performance.worker_threads.unwrap_or(num_cpus::get());
|
||||
let (tx, rx) = bounded::<Batch>(workers * CHANNEL_MULTIPLIER);
|
||||
|
||||
let root = root.to_path_buf();
|
||||
let scan_hidden = cfg.scanner.scan_hidden_files;
|
||||
let follow = cfg.scanner.follow_symlinks;
|
||||
let max_bytes = cfg.scanner.max_file_size_mb.unwrap_or(0) as u64 * 1_048_576;
|
||||
|
||||
// ----- 3 the background walker thread ---------------------------------
|
||||
thread::spawn(move || {
|
||||
let walker = WalkBuilder::new(root)
|
||||
WalkBuilder::new(root)
|
||||
.hidden(!scan_hidden)
|
||||
.follow_links(follow_links)
|
||||
.threads(worker_thrs)
|
||||
.follow_links(follow)
|
||||
.threads(workers)
|
||||
.overrides(overrides)
|
||||
.build_parallel();
|
||||
.build_parallel()
|
||||
.run(move || {
|
||||
let mut b = Batcher {
|
||||
tx: tx.clone(),
|
||||
batch: Vec::with_capacity(DEFAULT_BATCH),
|
||||
};
|
||||
|
||||
walker.run(move || {
|
||||
let mut batcher = Batcher {
|
||||
tx: tx.clone(),
|
||||
batch: Vec::with_capacity(BATCH_SIZE),
|
||||
};
|
||||
Box::new(move |entry| {
|
||||
let entry = match entry {
|
||||
Ok(e) if e.file_type().map(|ft| ft.is_file()).unwrap_or(false) => e,
|
||||
_ => return WalkState::Continue,
|
||||
};
|
||||
|
||||
Box::new(move |entry| {
|
||||
tracing::debug!("walking: {:?}", entry);
|
||||
let e = match entry {
|
||||
Ok(e) if e.file_type().map(|ft| ft.is_file()).unwrap_or(false) => e,
|
||||
_ => return WalkState::Continue,
|
||||
};
|
||||
if max_bytes != 0 {
|
||||
match e.metadata() {
|
||||
Ok(m) if m.len() <= max_bytes => {},
|
||||
_ => return WalkState::Continue,
|
||||
}
|
||||
}
|
||||
tracing::debug!("scanning file: {:?}", e);
|
||||
batcher.push(e.into_path());
|
||||
if batcher.batch.len() == BATCH_SIZE {
|
||||
let _ = batcher.tx.send(std::mem::take(&mut batcher.batch));
|
||||
}
|
||||
WalkState::Continue
|
||||
})
|
||||
});
|
||||
if max_bytes != 0 {
|
||||
match entry.metadata() {
|
||||
Ok(m) if m.len() > max_bytes => return WalkState::Continue,
|
||||
Err(e) => {
|
||||
tracing::debug!("metadata failed for {:?}: {e}", entry.path());
|
||||
return WalkState::Continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
b.push(entry.into_path());
|
||||
WalkState::Continue
|
||||
})
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
rx
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue