// use std::borrow::Cow; // use std::ffi::OsStr; // use std::io::{self, Write}; // use std::mem; // use std::path::PathBuf; // use std::sync::atomic::{AtomicBool, Ordering}; // use std::sync::{Arc, Mutex, MutexGuard}; // use std::thread; // use std::time::{Duration, Instant}; // // use anyhow::{anyhow, Result}; // use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender}; // use etcetera::BaseStrategy; // use ignore::overrides::{Override, OverrideBuilder}; // use ignore::{WalkBuilder, WalkParallel, WalkState}; // use regex::bytes::Regex; // // use crate::config::Config; // use crate::dir_entry::DirEntry; // use crate::error::print_error; // use crate::exec; // use crate::exit_codes::{merge_exitcodes, ExitCode}; // use crate::filesystem; // use crate::output; // // /// The receiver thread can either be buffering results or directly streaming to the console. // #[derive(PartialEq)] // enum ReceiverMode { // /// Receiver is still buffering in order to sort the results, if the search finishes fast // /// enough. // Buffering, // // /// Receiver is directly printing results to the output. // Streaming, // } // // /// The Worker threads can result in a valid entry having PathBuf or an error. // #[allow(clippy::large_enum_variant)] // #[derive(Debug)] // pub enum WorkerResult { // // Errors should be rare, so it's probably better to allow large_enum_variant than // // to box the Entry variant // Entry(ignore::DirEntry), // TODO: CHECK IF ERRORS // Error(ignore::Error), // } // // /// A batch of WorkerResults to send over a channel. // #[derive(Clone)] // struct Batch { // items: Arc>>>, // } // // impl Batch { // fn new() -> Self { // Self { // items: Arc::new(Mutex::new(Some(vec![]))), // } // } // // fn lock(&self) -> MutexGuard<'_, Option>> { // self.items.lock().unwrap() // } // } // // impl IntoIterator for Batch { // type Item = WorkerResult; // type IntoIter = std::vec::IntoIter; // // fn into_iter(self) -> Self::IntoIter { // self.lock().take().unwrap().into_iter() // } // } // // /// Wrapper that sends batches of items at once over a channel. // struct BatchSender { // batch: Batch, // tx: Sender, // limit: usize, // } // // impl BatchSender { // fn new(tx: Sender, limit: usize) -> Self { // Self { // batch: Batch::new(), // tx, // limit, // } // } // // /// Check if we need to flush a batch. // fn needs_flush(&self, batch: Option<&Vec>) -> bool { // match batch { // // Limit the batch size to provide some backpressure // Some(vec) => vec.len() >= self.limit, // // Batch was already taken by the receiver, so make a new one // None => true, // } // } // // /// Add an item to a batch. // fn send(&mut self, item: WorkerResult) -> Result<(), SendError<()>> { // let mut batch = self.batch.lock(); // // if self.needs_flush(batch.as_ref()) { // drop(batch); // self.batch = Batch::new(); // batch = self.batch.lock(); // } // // let items = batch.as_mut().unwrap(); // items.push(item); // // if items.len() == 1 { // // New batch, send it over the channel // self.tx // .send(self.batch.clone()) // .map_err(|_| SendError(()))?; // } // // Ok(()) // } // } // // /// Maximum size of the output buffer before flushing results to the console // const MAX_BUFFER_LENGTH: usize = 1000; // /// Default duration until output buffering switches to streaming. // const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); // // /// Wrapper for the receiver thread's buffering behavior. // struct ReceiverBuffer<'a, W> { // /// The configuration. // config: &'a Config, // /// For shutting down the senders. // quit_flag: &'a AtomicBool, // /// The ^C notifier. // interrupt_flag: &'a AtomicBool, // /// Receiver for worker results. // rx: Receiver, // /// Standard output. // stdout: W, // /// The current buffer mode. // mode: ReceiverMode, // /// The deadline to switch to streaming mode. // deadline: Instant, // /// The buffer of quickly received paths. // buffer: Vec, // /// Result count. // num_results: usize, // } // // impl<'a, W: Write> ReceiverBuffer<'a, W> { // /// Create a new receiver buffer. // fn new(state: &'a WorkerState, rx: Receiver, stdout: W) -> Self { // let config = &state.config; // let quit_flag = state.quit_flag.as_ref(); // let interrupt_flag = state.interrupt_flag.as_ref(); // let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME); // let deadline = Instant::now() + max_buffer_time; // // Self { // config, // quit_flag, // interrupt_flag, // rx, // stdout, // mode: ReceiverMode::Buffering, // deadline, // buffer: Vec::with_capacity(MAX_BUFFER_LENGTH), // num_results: 0, // } // } // // /// Process results until finished. // fn process(&mut self) -> ExitCode { // loop { // if let Err(ec) = self.poll() { // self.quit_flag.store(true, Ordering::Relaxed); // return ec; // } // } // } // // /// Receive the next worker result. // fn recv(&self) -> Result { // match self.mode { // ReceiverMode::Buffering => { // // Wait at most until we should switch to streaming // self.rx.recv_deadline(self.deadline) // } // ReceiverMode::Streaming => { // // Wait however long it takes for a result // Ok(self.rx.recv()?) // } // } // } // // /// Wait for a result or state change. // fn poll(&mut self) -> Result<(), ExitCode> { // match self.recv() { // Ok(batch) => { // for result in batch { // match result { // WorkerResult::Entry(dir_entry) => { // if self.config.quiet { // return Err(ExitCode::HasResults(true)); // } // // match self.mode { // ReceiverMode::Buffering => { // self.buffer.push(dir_entry); // if self.buffer.len() > MAX_BUFFER_LENGTH { // self.stream()?; // } // } // ReceiverMode::Streaming => { // self.print(&dir_entry)?; // } // } // // self.num_results += 1; // if let Some(max_results) = self.config.max_results { // if self.num_results >= max_results { // return self.stop(); // } // } // } // WorkerResult::Error(err) => { // if self.config.show_filesystem_errors { // print_error(err.to_string()); // } // } // } // } // // // If we don't have another batch ready, flush before waiting // if self.mode == ReceiverMode::Streaming && self.rx.is_empty() { // self.flush()?; // } // } // Err(RecvTimeoutError::Timeout) => { // self.stream()?; // } // Err(RecvTimeoutError::Disconnected) => { // return self.stop(); // } // } // // Ok(()) // } // // /// Output a path. // fn print(&mut self, entry: &DirEntry) -> Result<(), ExitCode> { // if let Err(e) = output::print_entry(&mut self.stdout, entry, self.config) { // if e.kind() != ::std::io::ErrorKind::BrokenPipe { // print_error(format!("Could not write to output: {e}")); // return Err(ExitCode::GeneralError); // } // } // // if self.interrupt_flag.load(Ordering::Relaxed) { // // Ignore any errors on flush, because we're about to exit anyway // let _ = self.flush(); // return Err(ExitCode::KilledBySigint); // } // // Ok(()) // } // // /// Switch ourselves into streaming mode. // fn stream(&mut self) -> Result<(), ExitCode> { // self.mode = ReceiverMode::Streaming; // // let buffer = mem::take(&mut self.buffer); // for path in buffer { // self.print(&path)?; // } // // self.flush() // } // // /// Stop looping. // fn stop(&mut self) -> Result<(), ExitCode> { // if self.mode == ReceiverMode::Buffering { // self.buffer.sort(); // self.stream()?; // } // // if self.config.quiet { // Err(ExitCode::HasResults(self.num_results > 0)) // } else { // Err(ExitCode::Success) // } // } // // /// Flush stdout if necessary. // fn flush(&mut self) -> Result<(), ExitCode> { // if self.stdout.flush().is_err() { // // Probably a broken pipe. Exit gracefully. // return Err(ExitCode::GeneralError); // } // Ok(()) // } // } // // /// State shared by the sender and receiver threads. // struct WorkerState { // /// The search patterns. // patterns: Vec, // /// The command line configuration. // config: Config, // /// Flag for cleanly shutting down the parallel walk // quit_flag: Arc, // /// Flag specifically for quitting due to ^C // interrupt_flag: Arc, // } // // impl WorkerState { // fn new(patterns: Vec, config: Config) -> Self { // let quit_flag = Arc::new(AtomicBool::new(false)); // let interrupt_flag = Arc::new(AtomicBool::new(false)); // // Self { // patterns, // config, // quit_flag, // interrupt_flag, // } // } // // fn build_overrides(&self, paths: &[PathBuf]) -> Result { // let first_path = &paths[0]; // let config = &self.config; // // let mut builder = OverrideBuilder::new(first_path); // // for pattern in &config.exclude_patterns { // builder // .add(pattern) // .map_err(|e| anyhow!("Malformed exclude pattern: {}", e))?; // } // // builder // .build() // .map_err(|_| anyhow!("Mismatch in exclude patterns")) // } // // fn build_walker(&self, paths: &[PathBuf]) -> Result { // let first_path = &paths[0]; // let config = &self.config; // let overrides = self.build_overrides(paths)?; // // let mut builder = WalkBuilder::new(first_path); // builder // .hidden(config.ignore_hidden) // .ignore(config.read_fdignore) // .parents(config.read_parent_ignore && (config.read_fdignore || config.read_vcsignore)) // .git_ignore(config.read_vcsignore) // .git_global(config.read_vcsignore) // .git_exclude(config.read_vcsignore) // .require_git(config.require_git_to_read_vcsignore) // .overrides(overrides) // .follow_links(config.follow_links) // // No need to check for supported platforms, option is unavailable on unsupported ones // .same_file_system(config.one_file_system) // .max_depth(config.max_depth); // // if config.read_fdignore { // builder.add_custom_ignore_filename(".fdignore"); // } // // if config.read_global_ignore { // if let Ok(basedirs) = etcetera::choose_base_strategy() { // let global_ignore_file = basedirs.config_dir().join("fd").join("ignore"); // if global_ignore_file.is_file() { // let result = builder.add_ignore(global_ignore_file); // match result { // Some(ignore::Error::Partial(_)) => (), // Some(err) => { // print_error(format!("Malformed pattern in global ignore file. {err}.")); // } // None => (), // } // } // } // } // // for ignore_file in &config.ignore_files { // let result = builder.add_ignore(ignore_file); // match result { // Some(ignore::Error::Partial(_)) => (), // Some(err) => { // print_error(format!("Malformed pattern in custom ignore file. {err}.")); // } // None => (), // } // } // // for path in &paths[1..] { // builder.add(path); // } // // let walker = builder.threads(config.threads).build_parallel(); // Ok(walker) // } // // /// Run the receiver work, either on this thread or a pool of background // /// threads (for --exec). // fn receive(&self, rx: Receiver) -> ExitCode { // let config = &self.config; // // // This will be set to `Some` if the `--exec` argument was supplied. // if let Some(ref cmd) = config.command { // if cmd.in_batch_mode() { // exec::batch(rx.into_iter().flatten(), cmd, config) // } else { // let out_perm = Mutex::new(()); // // thread::scope(|scope| { // // Each spawned job will store its thread handle in here. // let threads = config.threads; // let mut handles = Vec::with_capacity(threads); // for _ in 0..threads { // let rx = rx.clone(); // // // Spawn a job thread that will listen for and execute inputs. // let handle = scope // .spawn(|| exec::job(rx.into_iter().flatten(), cmd, &out_perm, config)); // // // Push the handle of the spawned thread into the vector for later joining. // handles.push(handle); // } // let exit_codes = handles.into_iter().map(|handle| handle.join().unwrap()); // merge_exitcodes(exit_codes) // }) // } // } else { // let stdout = io::stdout().lock(); // let stdout = io::BufWriter::new(stdout); // // ReceiverBuffer::new(self, rx, stdout).process() // } // } // // /// Spawn the sender threads. // fn spawn_senders(&self, walker: WalkParallel, tx: Sender) { // walker.run(|| { // let patterns = &self.patterns; // let config = &self.config; // let quit_flag = self.quit_flag.as_ref(); // // let mut limit = 0x100; // if let Some(cmd) = &config.command { // if !cmd.in_batch_mode() && config.threads > 1 { // // Evenly distribute work between multiple receivers // limit = 1; // } // } // let mut tx = BatchSender::new(tx.clone(), limit); // // Box::new(move |entry| { // if quit_flag.load(Ordering::Relaxed) { // return WalkState::Quit; // } // // let entry = match entry { // Ok(ref e) if e.depth() == 0 => { // // Skip the root directory entry. // return WalkState::Continue; // } // Ok(e) => DirEntry::normal(e), // Err(ignore::Error::WithPath { // path, // err: inner_err, // }) => match inner_err.as_ref() { // ignore::Error::Io(io_error) // if io_error.kind() == io::ErrorKind::NotFound // && path // .symlink_metadata() // .ok() // .is_some_and(|m| m.file_type().is_symlink()) => // { // DirEntry::broken_symlink(path) // } // _ => { // return match tx.send(WorkerResult::Error(ignore::Error::WithPath { // path, // err: inner_err, // })) { // Ok(_) => WalkState::Continue, // Err(_) => WalkState::Quit, // } // } // }, // Err(err) => { // return match tx.send(WorkerResult::Error(err)) { // Ok(_) => WalkState::Continue, // Err(_) => WalkState::Quit, // } // } // }; // // if let Some(min_depth) = config.min_depth { // if entry.depth().map_or(true, |d| d < min_depth) { // return WalkState::Continue; // } // } // // // Check the name first, since it doesn't require metadata // let entry_path = entry.path(); // // let search_str: Cow = if config.search_full_path { // let path_abs_buf = filesystem::path_absolute_form(entry_path) // .expect("Retrieving absolute path succeeds"); // Cow::Owned(path_abs_buf.as_os_str().to_os_string()) // } else { // match entry_path.file_name() { // Some(filename) => Cow::Borrowed(filename), // None => unreachable!( // "Encountered file system entry without a file name. This should only \ // happen for paths like 'foo/bar/..' or '/' which are not supposed to \ // appear in a file system traversal." // ), // } // }; // // if !patterns // .iter() // .all(|pat| pat.is_match(&filesystem::osstr_to_bytes(search_str.as_ref()))) // { // return WalkState::Continue; // } // // // Filter out unwanted extensions. // if let Some(ref exts_regex) = config.extensions { // if let Some(path_str) = entry_path.file_name() { // if !exts_regex.is_match(&filesystem::osstr_to_bytes(path_str)) { // return WalkState::Continue; // } // } else { // return WalkState::Continue; // } // } // // // Filter out unwanted file types. // if let Some(ref file_types) = config.file_types { // if file_types.should_ignore(&entry) { // return WalkState::Continue; // } // } // // #[cfg(unix)] // { // if let Some(ref owner_constraint) = config.owner_constraint { // if let Some(metadata) = entry.metadata() { // if !owner_constraint.matches(metadata) { // return WalkState::Continue; // } // } else { // return WalkState::Continue; // } // } // } // // // Filter out unwanted sizes if it is a file and we have been given size constraints. // if !config.size_constraints.is_empty() { // if entry_path.is_file() { // if let Some(metadata) = entry.metadata() { // let file_size = metadata.len(); // if config // .size_constraints // .iter() // .any(|sc| !sc.is_within(file_size)) // { // return WalkState::Continue; // } // } else { // return WalkState::Continue; // } // } else { // return WalkState::Continue; // } // } // // // Filter out unwanted modification times // if !config.time_constraints.is_empty() { // let mut matched = false; // if let Some(metadata) = entry.metadata() { // if let Ok(modified) = metadata.modified() { // matched = config // .time_constraints // .iter() // .all(|tf| tf.applies_to(&modified)); // } // } // if !matched { // return WalkState::Continue; // } // } // // if config.is_printing() { // if let Some(ls_colors) = &config.ls_colors { // // Compute colors in parallel // entry.style(ls_colors); // } // } // // let send_result = tx.send(WorkerResult::Entry(entry)); // // if send_result.is_err() { // return WalkState::Quit; // } // // // Apply pruning. // if config.prune { // return WalkState::Skip; // } // // WalkState::Continue // }) // }); // } // // /// Perform the recursive scan. // fn scan(&self, paths: &[PathBuf]) -> Result { // let config = &self.config; // let walker = self.build_walker(paths)?; // // if config.ls_colors.is_some() && config.is_printing() { // let quit_flag = Arc::clone(&self.quit_flag); // let interrupt_flag = Arc::clone(&self.interrupt_flag); // // ctrlc::set_handler(move || { // quit_flag.store(true, Ordering::Relaxed); // // if interrupt_flag.fetch_or(true, Ordering::Relaxed) { // // Ctrl-C has been pressed twice, exit NOW // ExitCode::KilledBySigint.exit(); // } // }) // .unwrap(); // } // // let (tx, rx) = bounded(2 * config.threads); // // let exit_code = thread::scope(|scope| { // // Spawn the receiver thread(s) // let receiver = scope.spawn(|| self.receive(rx)); // // // Spawn the sender threads. // self.spawn_senders(walker, tx); // // receiver.join().unwrap() // }); // // if self.interrupt_flag.load(Ordering::Relaxed) { // Ok(ExitCode::KilledBySigint) // } else { // Ok(exit_code) // } // } // } // // /// Recursively scan the given search path for files / pathnames matching the patterns. // /// // /// If the `--exec` argument was supplied, this will create a thread pool for executing // /// jobs in parallel from a given command line and the discovered paths. Otherwise, each // /// path will simply be written to standard output. // pub fn scan(paths: &[PathBuf], patterns: Vec, config: Config) -> Result { // WorkerState::new(patterns, config).scan(paths) // }