Refactor database connection handling with connection pooling and parallel processing

- Introduced `r2d2` connection pooling for SQLite in `database.rs`.
- Updated `Indexer` to use pooled connections for improved concurrency.
- Replaced sequential processing with `rayon` for parallel file scanning.
- Added a `clear` method to `Indexer` for reindexing support.
- Enhanced database initialization with `init` and `from_pool` methods.
- Updated `Cargo.toml` and `Cargo.lock` to include `r2d2`, `r2d2_sqlite`, and new dependencies.
This commit is contained in:
elipeter 2025-06-17 20:45:33 +02:00
parent 1933082b41
commit 0a62b6f40c
5 changed files with 412 additions and 67 deletions

View file

@ -5,6 +5,7 @@ use crate::patterns::Severity;
use crate::utils::Config;
use crate::utils::project::get_project_info;
use crate::walk::spawn_senders;
use rayon::prelude::*;
pub fn handle(
action: IndexAction,
@ -50,27 +51,35 @@ pub fn build_index(
tracing::debug!("Building index for: {}", project_name);
fs::File::create(db_path)?;
let mut indexer = Indexer::new(&project_name, &db_path)?;
let rx = spawn_senders(project_path, config);
for path in rx.iter().flatten() {
let issues = crate::commands::scan::run_rules_on_file(&path, config)?;
let file_id = indexer.upsert_file(&path)?;
let issue_rows: Vec<IssueRow> = issues
.iter()
.map(|d| IssueRow {
rule_id: d.id.as_ref(),
severity: match d.severity {
Severity::High => "HIGH",
Severity::Medium => "MEDIUM",
Severity::Low => "LOW",
},
line: d.line as i64,
col: d.col as i64,
})
.collect();
indexer.replace_issues(file_id, issue_rows)?;
let pool = Indexer::init(db_path)?;
{
let idx = Indexer::from_pool(&project_name, &pool).unwrap();
idx.clear()?;
}
tracing::debug!("Cleaned index for: {}", project_name);
let rx = spawn_senders(project_path, config);
let paths: Vec<_> = rx.into_iter().flatten().collect();
paths.into_par_iter().try_for_each(|path| -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let issues = crate::commands::scan::run_rules_on_file(&path, config).unwrap();
let mut idx = Indexer::from_pool(project_name, &pool).unwrap();
let file_id = idx.upsert_file(&path).unwrap();
let rows: Vec<IssueRow> = issues.iter().map(|d| IssueRow {
rule_id: d.id.as_ref(),
severity: match d.severity {
Severity::High => "HIGH",
Severity::Medium => "MEDIUM",
Severity::Low => "LOW",
},
line: d.line as i64,
col: d.col as i64,
}).collect();
idx.replace_issues(file_id, rows).unwrap();
Ok(())
}).unwrap();
Ok(())
}

View file

@ -1,7 +1,9 @@
use crate::utils::project::get_project_info;
use console::style;
use std::path::Path;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use crate::database::index::{IssueRow, Indexer};
use crate::patterns::Severity;
use crate::utils::config::Config;
@ -44,8 +46,8 @@ pub fn handle(
crate::commands::index::build_index(&project_name,&scan_path, &db_path, config)?;
}
let mut indexer = Indexer::new(&project_name, &db_path)?;
diags = scan_with_index(&project_name, &db_path, config, &mut indexer)?;
let pool = Indexer::init(&db_path)?;
diags = scan_with_index_parallel(&project_name, pool, config)?;
}
if format == "console" || format == "" && config.output.default_format == "console" {
@ -95,42 +97,49 @@ fn scan_filesystem(
Ok(acc.into_inner().unwrap())
}
fn scan_with_index(
fn scan_with_index_parallel(
project: &str,
_db_path: &Path,
pool: Arc<Pool<SqliteConnectionManager>>,
cfg: &Config,
indexer: &mut Indexer,
) -> Result<Vec<Diag>, Box<dyn std::error::Error>> {
let paths = indexer.get_files(project).unwrap_or_default();
let mut issues: Vec<Diag> = Vec::new();
for path in paths {
if indexer.should_scan(&path)? {
tracing::debug!("scanning files{}", path.display());
let mut diags = run_rules_on_file(&path, cfg)?;
let file_id = indexer.upsert_file(&path)?;
// Get the file list once (single connection, no contention)
let files = {
let idx = Indexer::from_pool(project, &pool)?;
idx.get_files(project)?
};
let issue_rows: Vec<IssueRow> = diags
.iter()
.map(|d| IssueRow {
rule_id: d.id.as_ref(),
severity: match d.severity {
Severity::High => "HIGH",
Severity::Medium => "MEDIUM",
Severity::Low => "LOW",
},
line: d.line as i64,
col: d.col as i64,
})
.collect();
let acc = Mutex::new(Vec::new());
indexer.replace_issues(file_id, issue_rows)?;
issues.append(&mut diags);
continue;
}
issues.append(&mut indexer.get_issues_from_file(&path)?);
}
Ok(issues)
files.into_par_iter()
.try_for_each(|path| -> Result<(), DynError> {
let mut idx = Indexer::from_pool(project, &pool).unwrap();
if idx.should_scan(&path).unwrap() {
let mut diags = run_rules_on_file(&path, cfg).unwrap();
let file_id = idx.upsert_file(&path).unwrap();
let rows: Vec<IssueRow> = diags.iter().map(|d| IssueRow {
rule_id: d.id.as_ref(),
severity: match d.severity {
Severity::High => "HIGH",
Severity::Medium => "MEDIUM",
Severity::Low => "LOW",
},
line: d.line as i64,
col: d.col as i64,
}).collect();
idx.replace_issues(file_id, rows).unwrap();
acc.lock().unwrap().append(&mut diags);
} else {
let mut cached = idx.get_issues_from_file(&path).unwrap();
acc.lock().unwrap().append(&mut cached);
}
Ok(())
}).unwrap();
Ok(acc.into_inner().unwrap())
}
// --------------------------------------------------------------------------------------------

View file

@ -1,11 +1,15 @@
pub mod index {
use rusqlite::{params, Connection, OptionalExtension};
use rusqlite::{params, Connection, OpenFlags, OptionalExtension};
use std::fs;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::commands::scan::Diag;
use crate::patterns::Severity;
use r2d2_sqlite::{SqliteConnectionManager};
use std::ops::Deref;
use std::sync::Arc;
use r2d2::{Pool, PooledConnection};
/// DB schema (foreignkeys enabled).
const SCHEMA: &str = r#"
@ -43,18 +47,48 @@ pub mod index {
}
pub struct Indexer {
conn: Connection,
conn: PooledConnection<SqliteConnectionManager>,
project: String,
}
impl Indexer {
/// Open (or create) the DB at `database_path` for the given project name.
pub fn new(project: &str, database_path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
let conn = Connection::open(database_path)?;
conn.execute_batch(SCHEMA)?;
pub fn init(
database_path: &Path,
) -> Result<std::sync::Arc<Pool<SqliteConnectionManager>>, Box<dyn std::error::Error>> {
let flags = OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_FULL_MUTEX;
let manager = SqliteConnectionManager::file(&database_path).with_flags(flags);
let pool = Arc::new(Pool::new(manager)?);
{
let conn = pool.get()?;
conn.pragma_update(None, "journal_mode", &"WAL")?;
conn.execute_batch(SCHEMA)?;
}
Ok(pool)
}
pub fn from_pool(
project: &str,
pool: &Pool<SqliteConnectionManager>,
) -> Result<Self, Box<dyn std::error::Error>> {
let conn = pool.get()?;
Ok(Self { conn, project: project.to_owned() })
}
// helper so code below can treat PooledConnection like &Connection
fn c(&self) -> &Connection { self.conn.deref() }
/// Open (or create) the DB at `database_path` for the given project name.
// pub fn new(project: &str, database_path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
// let conn = Connection::open(database_path)?;
// conn.pragma_update(None, "journal_mode", &"WAL")?;
// conn.execute_batch(SCHEMA)?;
// Ok(Self { conn, project: project.to_owned() })
// }
/// Return true when the file *content* or *mtime* changed since the last scan.
pub fn should_scan(&self, path: &Path) -> Result<bool, Box<dyn std::error::Error>> {
let meta = fs::metadata(path)?;
@ -83,7 +117,7 @@ pub mod index {
let scanned_at = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64;
let digest = Self::digest_file(path)?;
self.conn.execute(
self.c().execute(
"INSERT INTO files (project, path, hash, mtime, scanned_at)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(project,path) DO UPDATE
@ -93,7 +127,7 @@ pub mod index {
params![self.project, path.to_string_lossy(), digest, mtime, scanned_at],
)?;
let id: i64 = self.conn.query_row(
let id: i64 = self.c().query_row(
"SELECT id FROM files WHERE project = ?1 AND path = ?2",
params![self.project, path.to_string_lossy()],
|r| r.get(0),
@ -125,13 +159,13 @@ pub mod index {
&self,
path: &Path,
) -> Result<Vec<Diag>, Box<dyn std::error::Error>> {
let file_id: i64 = self.conn.query_row(
let file_id: i64 = self.c().query_row(
"SELECT id FROM files WHERE project = ?1 AND path = ?2",
params![self.project, path.to_string_lossy()],
|r| r.get(0),
)?;
let mut stmt = self.conn.prepare(
let mut stmt = self.c().prepare(
"SELECT rule_id, severity, line, col
FROM issues
WHERE file_id = ?1",
@ -153,7 +187,7 @@ pub mod index {
/// gets files from the database
pub fn get_files(&self, project: &str) -> Result<Vec<std::path::PathBuf>, Box<dyn std::error::Error>> {
let mut stmt = self.conn.prepare(
let mut stmt = self.c().prepare(
"SELECT path
FROM files
WHERE project = ?1",
@ -164,6 +198,24 @@ pub mod index {
Ok(file_iter.map(|p| p.map(PathBuf::from)).collect::<Result<_, _>>()?)
}
/// Clears the tables to prep for a reindex
pub fn clear(&self) -> rusqlite::Result<()> {
self.c().execute_batch(
r#"
PRAGMA foreign_keys = OFF;
DROP TABLE IF EXISTS issues;
DROP TABLE IF EXISTS files;
PRAGMA foreign_keys = ON;
VACUUM;
"#,
)?;
self.c().execute_batch(SCHEMA)?;
Ok(())
}
fn digest_file(path: &Path) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let mut hasher = blake3::Hasher::new();
let mut file = fs::File::open(path)?;