From 4e893c02ff510b10667f37e20dcd55b13eaf4c98 Mon Sep 17 00:00:00 2001 From: Sam Valladares Date: Fri, 19 Jun 2026 02:21:25 -0500 Subject: [PATCH] feat(connectors): add Redmine and source filters (#57) --- CHANGELOG.md | 45 +- crates/vestige-core/src/connectors/github.rs | 25 +- crates/vestige-core/src/connectors/mod.rs | 21 +- crates/vestige-core/src/connectors/redmine.rs | 737 ++++++++++++++++++ crates/vestige-core/src/storage/sqlite.rs | 50 +- crates/vestige-mcp/src/server.rs | 2 +- .../vestige-mcp/src/tools/search_unified.rs | 415 +++++++++- crates/vestige-mcp/src/tools/source_sync.rs | 184 ++++- docs/CONNECTORS.md | 74 +- 9 files changed, 1445 insertions(+), 108 deletions(-) create mode 100644 crates/vestige-core/src/connectors/redmine.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 2918af4..a220029 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 > Bump `version` in the workspace `Cargo.toml`, both crates, `server.json`, and > `package.json` to `2.1.27` at release/tag time, and date this heading. -Roadmap [#57](https://github.com/samvallad33/vestige/issues/57), **Phase 1–3**: -Vestige can now act as a durable, local, semantically-searchable retrieval layer -over an external system of record — starting with GitHub Issues — without -replacing it. The external system stays canonical; Vestige **indexes, connects, -retrieves, and cites back** to the source record. +Roadmap [#57](https://github.com/samvallad33/vestige/issues/57), **Phases 1–4 +(complete)**: Vestige can now act as a durable, local, semantically-searchable +retrieval layer over an external system of record — GitHub Issues and Redmine — +without replacing it. The external system stays canonical; Vestige **indexes, +connects, retrieves, and cites back** to the source record. Unlike a live ticket-system MCP proxy (which holds no state and is rate-limited per query), Vestige keeps a durable embedded index: searchable **offline**, @@ -25,12 +25,22 @@ content-hash idempotent sync, and tombstoning of vanished records. ### Added -- **`source_sync` MCP tool** — point Vestige at a GitHub repo - (`{"repo": "owner/name"}`) and it indexes every issue + its comments as - source-aware memories. Re-running updates changed issues in place (no - duplicates); `reconcile: true` tombstones issues no longer visible upstream. - Auth via the `GITHUB_TOKEN` (or `VESTIGE_GITHUB_TOKEN`) environment variable; - public repos work without a token at a lower rate limit. +- **`source_sync` MCP tool** — index an external system into Vestige. + - GitHub: `{"source": "github", "repo": "owner/name"}` indexes every issue + + its comments. Auth via `GITHUB_TOKEN` (public repos work tokenless at a + lower rate limit). + - Redmine: `{"source": "redmine", "project": ""}` indexes a project's + issues + journals (comments and status/assignment history). Host from + `REDMINE_URL`, auth from `REDMINE_API_KEY`. + - Re-running updates changed issues in place (no duplicates); `reconcile: + true` tombstones issues no longer visible upstream. +- **Source-aware investigation filters on `search`** (Phase 4) — filter results + by `source_system`, `source_project`, `source_id`, `source_type`, + `source_author`, a `source_updated_after`/`source_updated_before` date range, + and `source_status` (`valid` / `tombstoned` / `any`). Status, tracker, and + priority remain filterable via the existing `tag_prefix` (the connectors emit + `status:`/`tracker:`/`priority:`/`label:` tags). Applied as post-filters; + non-connector memories are excluded from a source-scoped query. - **Source envelope** on every memory — structured, machine-readable provenance (`source_system`, `source_id`, `source_url`, `source_updated_at`, `content_hash`, `synced_at`, `source_project`, `source_type`, `source_author`) @@ -44,10 +54,15 @@ content-hash idempotent sync, and tombstoning of vanished records. record is retained for audit but drops out of current retrieval). - **Connector contract** (`vestige_core::connectors`) — a small source-agnostic `Connector` trait + `run_sync` driver (cursor overlap window, incremental - paging, optional deletion reconcile) and a GitHub Issues reference connector - behind the optional `connectors` cargo feature (on by default in the MCP - server, off in the core library's default features so non-connector consumers - link no HTTP client). + paging, optional deletion reconcile) with two reference connectors behind the + optional `connectors` cargo feature (on by default in the MCP server, off in + the core library's default features so non-connector consumers link no HTTP + client): + - **GitHub Issues** — `state=all`, `since` cursor, Link-header pagination, + drops PRs, host-pinned next-url. + - **Redmine** — `status_id=*` (open + closed), hex-encoded `updated_on>=` + cursor, `offset` pagination, per-issue detail fetch for journals (the list + endpoint omits them), `X-Redmine-API-Key` header auth. ### Database diff --git a/crates/vestige-core/src/connectors/github.rs b/crates/vestige-core/src/connectors/github.rs index a373dcf..24bd60e 100644 --- a/crates/vestige-core/src/connectors/github.rs +++ b/crates/vestige-core/src/connectors/github.rs @@ -490,25 +490,25 @@ mod tests { fn hash_stable_across_label_order_and_changes_on_edit() { let c = connector(); let mut a = issue(1, "T", "body", "open"); - a.labels = vec![ - RawLabel { name: "b".into() }, - RawLabel { name: "a".into() }, - ]; + a.labels = vec![RawLabel { name: "b".into() }, RawLabel { name: "a".into() }]; let mut b = issue(1, "T", "body", "open"); - b.labels = vec![ - RawLabel { name: "a".into() }, - RawLabel { name: "b".into() }, - ]; + b.labels = vec![RawLabel { name: "a".into() }, RawLabel { name: "b".into() }]; let ha = c.normalize(&a, &[]).envelope.content_hash; let hb = c.normalize(&b, &[]).envelope.content_hash; assert_eq!(ha, hb, "label order must not change the hash"); // Editing the body must change the hash. - let edited = c.normalize(&issue(1, "T", "EDITED", "open"), &[]).envelope.content_hash; + let edited = c + .normalize(&issue(1, "T", "EDITED", "open"), &[]) + .envelope + .content_hash; assert_ne!(ha, edited); // Closing the issue changes state → changes the hash (not a no-op). - let closed = c.normalize(&issue(1, "T", "body", "closed"), &[]).envelope.content_hash; + let closed = c + .normalize(&issue(1, "T", "body", "closed"), &[]) + .envelope + .content_hash; assert_ne!(ha, closed); } @@ -533,7 +533,10 @@ mod tests { let second_pos = rec.content.find("second").unwrap(); assert!(first_pos < second_pos, "comments must fold in id order"); - let no_comments = c.normalize(&issue(1, "T", "body", "open"), &[]).envelope.content_hash; + let no_comments = c + .normalize(&issue(1, "T", "body", "open"), &[]) + .envelope + .content_hash; assert_ne!( rec.envelope.content_hash, no_comments, "comments must contribute to the hash" diff --git a/crates/vestige-core/src/connectors/mod.rs b/crates/vestige-core/src/connectors/mod.rs index 033e065..e82a5ab 100644 --- a/crates/vestige-core/src/connectors/mod.rs +++ b/crates/vestige-core/src/connectors/mod.rs @@ -11,9 +11,9 @@ //! - The [`Connector`] contract, [`NormalizedRecord`] shape, and the stable //! [`content_hash`] are pure (no network) and always compiled, so the sync //! semantics are unit-testable without hitting an API. -//! - Network-backed reference connectors (e.g. [`github`]) live behind the -//! `connectors` cargo feature so the default local-first build links no HTTP -//! client. +//! - Network-backed reference connectors ([`github`] and [`redmine`]) live +//! behind the `connectors` cargo feature so the default local-first build +//! links no HTTP client. //! //! ## Sync contract (the part that makes re-running safe) //! @@ -39,6 +39,9 @@ use crate::storage::ConnectorCursor; #[cfg(feature = "connectors")] pub mod github; +#[cfg(feature = "connectors")] +pub mod redmine; + /// A single external record, already normalized into the fields Vestige needs. /// /// The connector is responsible for flattening a possibly-rich source record @@ -329,8 +332,16 @@ mod tests { #[test] fn content_hash_is_order_independent() { - let a = content_hash(&[("title", "Crash"), ("body", "stacktrace"), ("state", "open")]); - let b = content_hash(&[("state", "open"), ("title", "Crash"), ("body", "stacktrace")]); + let a = content_hash(&[ + ("title", "Crash"), + ("body", "stacktrace"), + ("state", "open"), + ]); + let b = content_hash(&[ + ("state", "open"), + ("title", "Crash"), + ("body", "stacktrace"), + ]); assert_eq!(a, b, "reordering fields must not change the hash"); } diff --git a/crates/vestige-core/src/connectors/redmine.rs b/crates/vestige-core/src/connectors/redmine.rs new file mode 100644 index 0000000..1b2a316 --- /dev/null +++ b/crates/vestige-core/src/connectors/redmine.rs @@ -0,0 +1,737 @@ +//! Redmine connector (#57). +//! +//! Indexes a Redmine project's issues + journals (comments and status/assignment +//! history) into source-aware Vestige memories so an investigative agent can +//! search and reason over years of ticket history **offline**, **semantically**, +//! and **cited back to the canonical issue URL**. Redmine stays the system of +//! record; Vestige indexes, connects, retrieves, and links back. +//! +//! ## Incremental sync (per the connector sync contract) +//! +//! Redmine's REST API has three traps this connector handles explicitly (all +//! confirmed against the official wiki + canonical defects): +//! +//! - **`status_id=*` is mandatory.** The list endpoint returns *open issues +//! only* by default, so without it closing an issue looks like a deletion and +//! closed issues are never synced (Defect #19088). We pass it on both the +//! incremental pull and the reconcile enumeration. +//! - **`include=journals` is silently ignored on the list endpoint.** Journals +//! come back only on the per-issue detail endpoint `GET /issues/{id}.json` +//! (Defect #35242), so each changed issue costs one extra round-trip. +//! - **Filter operators must be hex-encoded** in the compact form +//! (`updated_on=>=…` → `updated_on=%3E%3D…`). We build the query with +//! `reqwest`'s `.query(&[…])` and pass the raw `>=…` value so it is encoded +//! exactly once (no double-encoding). +//! +//! `sort=updated_on:asc` pages forward in cursor order so a mid-run interruption +//! resumes safely; the `since = cursor − overlap` window + the `content_hash` +//! no-op make the re-scan free. Redmine has no deletion feed, so deletions are +//! reconciled out-of-band via [`list_live_ids`](Connector::list_live_ids). + +use chrono::{DateTime, Utc}; +use serde::Deserialize; + +use super::{ + Connector, ConnectorError, ConnectorResult, FetchPage, NormalizedRecord, content_hash, +}; +use crate::memory::SourceEnvelope; + +const USER_AGENT: &str = concat!("vestige-connector/", env!("CARGO_PKG_VERSION")); +const PAGE_LIMIT: u32 = 100; + +/// Configuration for a Redmine connector instance bound to one project. +#[derive(Debug, Clone)] +pub struct RedmineConfig { + /// Base URL of the Redmine instance, e.g. `https://redmine.example.com`. + pub base_url: String, + /// Project identifier to scope the sync to. May be the numeric id or the + /// project identifier slug — used as `project_id` and stored as + /// `source_project`. (Note: Redmine's `project_id` list filter wants the + /// numeric id; the slug works as the human-readable scope label.) + pub project: String, + /// API access key. Optional only if the instance allows anonymous REST. + pub api_key: Option, + /// Max journals to fold into one issue memory (defense against huge threads). + pub max_journals: usize, +} + +impl RedmineConfig { + pub fn new(base_url: impl Into, project: impl Into) -> Self { + Self { + base_url: base_url.into(), + project: project.into(), + api_key: None, + max_journals: 100, + } + } + + pub fn with_api_key(mut self, key: Option) -> Self { + self.api_key = key; + self + } + + /// Base URL with any trailing slash removed. + fn root(&self) -> String { + self.base_url.trim_end_matches('/').to_string() + } +} + +/// A Redmine connector bound to one project. +pub struct RedmineConnector { + config: RedmineConfig, + scope: String, + client: reqwest::Client, +} + +impl RedmineConnector { + pub fn new(config: RedmineConfig) -> ConnectorResult { + if config.base_url.trim().is_empty() { + return Err(ConnectorError::Config("base_url is required".to_string())); + } + if config.project.trim().is_empty() { + return Err(ConnectorError::Config("project is required".to_string())); + } + if reqwest::Url::parse(&config.root()).is_err() { + return Err(ConnectorError::Config(format!( + "base_url is not a valid URL: {}", + config.base_url + ))); + } + let client = reqwest::Client::builder() + .user_agent(USER_AGENT) + .build() + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + let scope = config.project.clone(); + Ok(Self { + config, + scope, + client, + }) + } + + fn auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + let req = req.header("Accept", "application/json"); + match &self.config.api_key { + // The key goes in the header (not the URL) so it stays out of proxy + // and access logs. + Some(k) => req.header("X-Redmine-API-Key", k), + None => req, + } + } + + fn classify_status(resp: &reqwest::Response) -> Option { + let status = resp.status(); + if status.is_success() { + return None; + } + if status.as_u16() == 429 { + let retry = resp + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .map(std::time::Duration::from_secs); + return Some(ConnectorError::RateLimited(retry)); + } + let message = match status.as_u16() { + // A valid key against an instance with REST disabled 401/403s; make + // that distinguishable from "no results". + 401 | 403 => { + "unauthorized — check REDMINE_API_KEY and that the instance has the REST API enabled (Administration → Settings → API)" + .to_string() + } + _ => status + .canonical_reason() + .unwrap_or("request failed") + .to_string(), + }; + Some(ConnectorError::Source { + status: status.as_u16(), + message, + }) + } + + /// Fetch the journals + relations for one issue (the detail endpoint — + /// journals are not returned on the list endpoint). + async fn fetch_detail(&self, issue_id: u64) -> ConnectorResult { + let url = format!("{}/issues/{}.json", self.config.root(), issue_id); + let resp = self + .auth(self.client.get(&url)) + .query(&[("include", "journals,relations")]) + .send() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + if let Some(err) = Self::classify_status(&resp) { + return Err(err); + } + let wrapper: IssueWrapper = resp + .json() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + Ok(wrapper.issue) + } + + /// Fold a raw issue (with journals) into one normalized memory record. + fn normalize(&self, issue: &RawIssue) -> NormalizedRecord { + let status_name = issue.status.as_ref().map(|s| s.name.clone()); + let tracker_name = issue.tracker.as_ref().map(|t| t.name.clone()); + let author = issue.author.as_ref().map(|a| a.name.clone()); + + // Journals sorted by id for a stable order + stable hash. Keep notes + // and field changes so status/assignment history remains searchable. + let mut journals: Vec<&RawJournal> = issue + .journals + .iter() + .filter(|j| { + j.notes + .as_deref() + .map(|n| !n.trim().is_empty()) + .unwrap_or(false) + || !j.details.is_empty() + }) + .collect(); + journals.sort_by_key(|j| j.id); + journals.truncate(self.config.max_journals); + + // Human-readable content. + let mut content = format!("[{}#{}] {}\n", self.scope, issue.id, issue.subject); + if let Some(s) = &status_name { + content.push_str(&format!("Status: {s}\n")); + } + if let Some(t) = &tracker_name { + content.push_str(&format!("Tracker: {t}\n")); + } + if let Some(desc) = &issue.description + && !desc.trim().is_empty() + { + content.push('\n'); + content.push_str(desc.trim()); + content.push('\n'); + } + for j in &journals { + let who = j.user.as_ref().map(|u| u.name.as_str()).unwrap_or("?"); + let note = j.notes.as_deref().unwrap_or("").trim(); + if !note.is_empty() { + content.push_str(&format!("\n- {who}: {note}")); + } + for detail in &j.details { + content.push_str(&format!( + "\n- {who} changed {}{}: {} -> {}", + detail.property.as_deref().unwrap_or("field"), + detail + .name + .as_deref() + .map(|n| format!(".{n}")) + .unwrap_or_default(), + detail.old_value.as_deref().unwrap_or(""), + detail.new_value.as_deref().unwrap_or("") + )); + } + } + if !issue.relations.is_empty() { + content.push_str("\n\nRelations:"); + let mut relations: Vec<&RawRelation> = issue.relations.iter().collect(); + relations.sort_by_key(|r| r.id); + for relation in relations { + let related = relation.related_issue_id(issue.id); + content.push_str(&format!( + "\n- #{} ({})", + related, + relation.relation_type.as_deref().unwrap_or("relates") + )); + if let Some(delay) = relation.delay { + content.push_str(&format!(", delay {delay}")); + } + } + } + + // Stable content hash — meaning only, never the cursor (`updated_on`) or + // volatile counts. Journals and relations contribute stable fields in id + // order. + let journals_blob = journals + .iter() + .map(|j| { + let details = j + .details + .iter() + .map(|d| { + format!( + "{}:{}:{}:{}", + d.property.as_deref().unwrap_or(""), + d.name.as_deref().unwrap_or(""), + d.old_value.as_deref().unwrap_or(""), + d.new_value.as_deref().unwrap_or("") + ) + }) + .collect::>() + .join("\u{1e}"); + format!( + "{}:{}:{}", + j.id, + j.notes.as_deref().unwrap_or("").trim(), + details + ) + }) + .collect::>() + .join("\u{1f}"); + let relations_blob = { + let mut relations: Vec<&RawRelation> = issue.relations.iter().collect(); + relations.sort_by_key(|r| r.id); + relations + .iter() + .map(|r| { + format!( + "{}:{}:{}:{}", + r.id, + r.issue_id.unwrap_or_default(), + r.issue_to_id.unwrap_or_default(), + r.relation_type.as_deref().unwrap_or("") + ) + }) + .collect::>() + .join("\u{1f}") + }; + let id_str = issue.id.to_string(); + let status_id_str = issue + .status + .as_ref() + .map(|s| s.id.to_string()) + .unwrap_or_default(); + let tracker_id_str = issue + .tracker + .as_ref() + .map(|t| t.id.to_string()) + .unwrap_or_default(); + let done_ratio_str = issue.done_ratio.unwrap_or(0).to_string(); + let desc_str = issue.description.clone().unwrap_or_default(); + let hash = content_hash(&[ + ("id", &id_str), + ("subject", &issue.subject), + ("description", &desc_str), + ("status_id", &status_id_str), + ("tracker_id", &tracker_id_str), + ("done_ratio", &done_ratio_str), + ("journals", &journals_blob), + ("relations", &relations_blob), + ]); + + // Tags, lowercased — `tag_prefix` matching is case-sensitive, and + // Redmine status/tracker names are mixed-case. + let mut tags = vec!["redmine".to_string(), "issue".to_string()]; + if let Some(s) = &status_name { + tags.push(format!("status:{}", s.to_lowercase())); + } + if let Some(t) = &tracker_name { + tags.push(format!("tracker:{}", t.to_lowercase())); + } + if let Some(p) = &issue.priority { + tags.push(format!("priority:{}", p.name.to_lowercase())); + } + + let envelope = SourceEnvelope { + source_system: Some("redmine".to_string()), + source_id: Some(issue.id.to_string()), + source_url: Some(format!("{}/issues/{}", self.config.root(), issue.id)), + source_updated_at: issue + .updated_on + .as_deref() + .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|d| d.with_timezone(&Utc)), + content_hash: Some(hash), + synced_at: Some(Utc::now()), + source_project: Some(self.scope.clone()), + source_type: Some("issue".to_string()), + source_author: author, + }; + + NormalizedRecord { + content, + tags, + envelope, + } + } +} + +impl Connector for RedmineConnector { + fn source_system(&self) -> &str { + "redmine" + } + + fn scope(&self) -> &str { + &self.scope + } + + async fn fetch_updated( + &self, + since: Option>, + cursor: Option, + ) -> ConnectorResult { + // The cursor carries the next offset (Redmine pages by offset, not an + // opaque url). First page = offset 0. + let offset: u32 = cursor.as_deref().and_then(|c| c.parse().ok()).unwrap_or(0); + + let url = format!("{}/issues.json", self.config.root()); + let limit_str = PAGE_LIMIT.to_string(); + let offset_str = offset.to_string(); + // Build params; reqwest percent-encodes each value exactly once, so we + // pass the RAW `>=…` operator (it becomes %3E%3D on the wire). Do not + // pre-encode here or it would be double-encoded. + let mut params: Vec<(&str, String)> = vec![ + ("status_id", "*".to_string()), + ("sort", "updated_on:asc".to_string()), + ("project_id", self.config.project.clone()), + ("limit", limit_str), + ("offset", offset_str), + ]; + if let Some(s) = since { + let since_z = s.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + params.push(("updated_on", format!(">={since_z}"))); + } + + let resp = self + .auth(self.client.get(&url)) + .query(¶ms) + .send() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + if let Some(err) = Self::classify_status(&resp) { + return Err(err); + } + let page: IssueListResponse = resp + .json() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + + // Per-issue detail fetch for journals (list endpoint omits them). + let mut records = Vec::new(); + for summary in &page.issues { + let detailed = match self.fetch_detail(summary.id).await { + Ok(d) => d, + // A single issue failing detail-fetch should not abort the page; + // fall back to the list-level fields (no journals). + Err(_) => summary.clone(), + }; + records.push(self.normalize(&detailed)); + } + + // Advance the offset cursor until we've walked total_count. + let next_offset = offset + page.issues.len() as u32; + let next_cursor = if (next_offset as u64) < page.total_count && !page.issues.is_empty() { + Some(next_offset.to_string()) + } else { + None + }; + + Ok(FetchPage { + records, + next_cursor, + }) + } + + async fn list_live_ids(&self) -> ConnectorResult>> { + // Enumerate all issue ids (open AND closed) for the reconcile pass. + // status_id=* is mandatory here too, or closed issues read as deleted. + let mut ids = Vec::new(); + let mut offset: u32 = 0; + loop { + let url = format!("{}/issues.json", self.config.root()); + let resp = self + .auth(self.client.get(&url)) + .query(&[ + ("status_id", "*".to_string()), + ("project_id", self.config.project.clone()), + ("limit", PAGE_LIMIT.to_string()), + ("offset", offset.to_string()), + ]) + .send() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + if let Some(err) = Self::classify_status(&resp) { + return Err(err); + } + let page: IssueListResponse = resp + .json() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + if page.issues.is_empty() { + break; + } + for issue in &page.issues { + ids.push(issue.id.to_string()); + } + offset += page.issues.len() as u32; + if (offset as u64) >= page.total_count { + break; + } + } + Ok(Some(ids)) + } +} + +// --------------------------------------------------------------------------- +// Raw Redmine API shapes (only the fields we use) +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +struct IssueListResponse { + #[serde(default)] + issues: Vec, + #[serde(default)] + total_count: u64, +} + +#[derive(Debug, Deserialize)] +struct IssueWrapper { + issue: RawIssue, +} + +#[derive(Debug, Clone, Deserialize)] +struct RawIssue { + id: u64, + #[serde(default)] + subject: String, + #[serde(default)] + description: Option, + #[serde(default)] + status: Option, + #[serde(default)] + tracker: Option, + #[serde(default)] + priority: Option, + #[serde(default)] + author: Option, + #[serde(default)] + done_ratio: Option, + #[serde(default)] + updated_on: Option, + #[serde(default)] + journals: Vec, + #[serde(default)] + relations: Vec, +} + +/// Redmine `{id, name}` reference (status, tracker, priority, user, …). +#[derive(Debug, Clone, Deserialize)] +struct NamedRef { + #[serde(default)] + id: i64, + #[serde(default)] + name: String, +} + +#[derive(Debug, Clone, Deserialize)] +struct RawJournal { + id: u64, + #[serde(default)] + notes: Option, + #[serde(default)] + user: Option, + #[serde(default)] + details: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct RawJournalDetail { + #[serde(default)] + property: Option, + #[serde(default)] + name: Option, + #[serde(default)] + old_value: Option, + #[serde(default)] + new_value: Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct RawRelation { + #[serde(default)] + id: u64, + #[serde(default)] + issue_id: Option, + #[serde(default)] + issue_to_id: Option, + #[serde(default)] + relation_type: Option, + #[serde(default)] + delay: Option, +} + +impl RawRelation { + fn related_issue_id(&self, current_issue_id: u64) -> u64 { + match (self.issue_id, self.issue_to_id) { + (Some(from), Some(to)) if from == current_issue_id => to, + (Some(from), Some(to)) if to == current_issue_id => from, + (_, Some(to)) => to, + (Some(from), _) => from, + _ => 0, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn issue(id: u64, subject: &str, desc: &str, status: (i64, &str)) -> RawIssue { + RawIssue { + id, + subject: subject.to_string(), + description: Some(desc.to_string()), + status: Some(NamedRef { + id: status.0, + name: status.1.to_string(), + }), + tracker: Some(NamedRef { + id: 1, + name: "Bug".to_string(), + }), + priority: Some(NamedRef { + id: 2, + name: "Normal".to_string(), + }), + author: Some(NamedRef { + id: 7, + name: "Jane Dev".to_string(), + }), + done_ratio: Some(0), + updated_on: Some("2026-06-19T00:00:00Z".to_string()), + journals: vec![], + relations: vec![], + } + } + + fn connector() -> RedmineConnector { + RedmineConnector::new(RedmineConfig::new("https://redmine.example.com", "infra")).unwrap() + } + + #[test] + fn rejects_empty_and_bad_config() { + assert!(RedmineConnector::new(RedmineConfig::new("", "p")).is_err()); + assert!(RedmineConnector::new(RedmineConfig::new("https://r.example", "")).is_err()); + assert!(RedmineConnector::new(RedmineConfig::new("not a url", "p")).is_err()); + } + + #[test] + fn normalize_builds_keyed_envelope_with_citation() { + let c = connector(); + let rec = c.normalize(&issue(123, "Disk full", "df -h shows 100%", (1, "New"))); + let env = &rec.envelope; + assert!(env.has_key()); + assert_eq!(env.source_system.as_deref(), Some("redmine")); + assert_eq!(env.source_id.as_deref(), Some("123")); + assert_eq!( + env.source_url.as_deref(), + Some("https://redmine.example.com/issues/123") + ); + assert_eq!(env.source_project.as_deref(), Some("infra")); + assert_eq!(env.source_author.as_deref(), Some("Jane Dev")); + assert!(rec.content.contains("Disk full")); + // Tags lowercased so the case-sensitive tag_prefix filter matches. + assert!(rec.tags.contains(&"status:new".to_string())); + assert!(rec.tags.contains(&"tracker:bug".to_string())); + assert!(rec.tags.contains(&"priority:normal".to_string())); + } + + #[test] + fn status_change_changes_hash() { + let c = connector(); + let new = c + .normalize(&issue(1, "T", "body", (1, "New"))) + .envelope + .content_hash; + let closed = c + .normalize(&issue(1, "T", "body", (5, "Closed"))) + .envelope + .content_hash; + assert_ne!( + new, closed, + "a status change must change the hash → re-embed" + ); + } + + #[test] + fn journals_fold_in_id_order_and_affect_hash() { + let c = connector(); + let mut iss = issue(1, "T", "body", (1, "New")); + iss.journals = vec![ + RawJournal { + id: 20, + notes: Some("second".to_string()), + user: Some(NamedRef { + id: 1, + name: "B".to_string(), + }), + details: vec![], + }, + RawJournal { + id: 10, + notes: Some("first".to_string()), + user: Some(NamedRef { + id: 2, + name: "A".to_string(), + }), + details: vec![], + }, + // Pure empty journal must be dropped, not folded. + RawJournal { + id: 30, + notes: None, + user: Some(NamedRef { + id: 3, + name: "C".to_string(), + }), + details: vec![], + }, + ]; + let rec = c.normalize(&iss); + let first = rec.content.find("first").unwrap(); + let second = rec.content.find("second").unwrap(); + assert!(first < second, "journals fold in id order"); + + let no_journals = c + .normalize(&issue(1, "T", "body", (1, "New"))) + .envelope + .content_hash; + assert_ne!( + rec.envelope.content_hash, no_journals, + "journals must contribute to the hash" + ); + } + + #[test] + fn journal_details_and_relations_are_searchable_and_hashed() { + let c = connector(); + let mut iss = issue(1, "T", "body", (1, "New")); + iss.journals = vec![RawJournal { + id: 1, + notes: None, + user: Some(NamedRef { + id: 2, + name: "A".to_string(), + }), + details: vec![RawJournalDetail { + property: Some("attr".to_string()), + name: Some("status_id".to_string()), + old_value: Some("1".to_string()), + new_value: Some("5".to_string()), + }], + }]; + iss.relations = vec![RawRelation { + id: 9, + issue_id: Some(1), + issue_to_id: Some(42), + relation_type: Some("relates".to_string()), + delay: None, + }]; + + let rec = c.normalize(&iss); + assert!(rec.content.contains("changed attr.status_id: 1 -> 5")); + assert!(rec.content.contains("#42 (relates)")); + + let no_history = c.normalize(&issue(1, "T", "body", (1, "New"))); + assert_ne!( + rec.envelope.content_hash, no_history.envelope.content_hash, + "field-change journals and relations must affect idempotent updates" + ); + } +} diff --git a/crates/vestige-core/src/storage/sqlite.rs b/crates/vestige-core/src/storage/sqlite.rs index 399b2fb..d353ec1 100644 --- a/crates/vestige-core/src/storage/sqlite.rs +++ b/crates/vestige-core/src/storage/sqlite.rs @@ -9655,7 +9655,11 @@ impl SqliteMemoryStore { /// Read the incremental-sync checkpoint for a `(source_system, scope)`. /// Returns a zeroed cursor (no high-water mark) if none has been saved yet. - pub fn get_connector_cursor(&self, source_system: &str, scope: &str) -> Result { + pub fn get_connector_cursor( + &self, + source_system: &str, + scope: &str, + ) -> Result { let reader = self .reader .lock() @@ -9873,7 +9877,11 @@ mod tests { assert_eq!(r.outcome, SourceUpsertOutcome::Unchanged); assert_eq!(r.node_id, r1.node_id, "must reuse the same memory id"); } - assert_eq!(node_count(&store), 1, "idempotent: still exactly one memory"); + assert_eq!( + node_count(&store), + 1, + "idempotent: still exactly one memory" + ); } #[test] @@ -9945,7 +9953,13 @@ mod tests { let mut c2 = cursor.clone(); c2.records_seen = 99; store.save_connector_cursor(&c2).unwrap(); - assert_eq!(store.get_connector_cursor("github", "o/r").unwrap().records_seen, 99); + assert_eq!( + store + .get_connector_cursor("github", "o/r") + .unwrap() + .records_seen, + 99 + ); } #[test] @@ -9977,9 +9991,15 @@ mod tests { ) .unwrap() }; - assert!(two.1.is_some(), "tombstoned record must have valid_until set"); + assert!( + two.1.is_some(), + "tombstoned record must have valid_until set" + ); let node = store.get_node(&two.0).unwrap().unwrap(); - assert!(!node.is_currently_valid(), "tombstoned node is not valid now"); + assert!( + !node.is_currently_valid(), + "tombstoned node is not valid now" + ); assert_eq!(node.content, "issue 2", "content retained for audit"); // A reappearing record un-tombstones on next upsert (clears valid_until). @@ -9987,7 +10007,10 @@ mod tests { .upsert_by_source(source_input("2", "issue 2", "h2")) .unwrap(); let revived = store.get_node(&two.0).unwrap().unwrap(); - assert!(revived.is_currently_valid(), "re-synced record is valid again"); + assert!( + revived.is_currently_valid(), + "re-synced record is valid again" + ); } #[test] @@ -10013,7 +10036,10 @@ mod tests { .unwrap(); } assert!( - store.superseded_node_ids().unwrap().contains(&created.node_id), + store + .superseded_node_ids() + .unwrap() + .contains(&created.node_id), "precondition: node is superseded" ); @@ -10023,7 +10049,10 @@ mod tests { .unwrap(); assert_eq!(res.outcome, SourceUpsertOutcome::Updated); assert!( - !store.superseded_node_ids().unwrap().contains(&created.node_id), + !store + .superseded_node_ids() + .unwrap() + .contains(&created.node_id), "superseded_by must be cleared on re-sync (no bitemporal zombie)" ); let node = store.get_node(&created.node_id).unwrap().unwrap(); @@ -10044,7 +10073,10 @@ mod tests { .unwrap(); assert_eq!(res2.outcome, SourceUpsertOutcome::Unchanged); assert!( - !store.superseded_node_ids().unwrap().contains(&created.node_id), + !store + .superseded_node_ids() + .unwrap() + .contains(&created.node_id), "Unchanged branch must also clear superseded_by" ); } diff --git a/crates/vestige-mcp/src/server.rs b/crates/vestige-mcp/src/server.rs index b8f90e7..6b919fa 100644 --- a/crates/vestige-mcp/src/server.rs +++ b/crates/vestige-mcp/src/server.rs @@ -285,7 +285,7 @@ impl McpServer { // ================================================================ ToolDescription { name: "source_sync".to_string(), - description: Some("Index an external system (GitHub Issues) into Vestige as a durable, offline, semantically-searchable index that cites back to the canonical record. Provide 'repo' as 'owner/name'. Idempotent: re-running updates changed issues without duplicating; set reconcile=true to tombstone issues removed upstream. Auth via the GITHUB_TOKEN env var (optional for public repos).".to_string()), + description: Some("Index an external system into Vestige as a durable, offline, semantically-searchable index that cites back to the canonical record. GitHub: source='github', repo='owner/name' (auth via GITHUB_TOKEN env). Redmine: source='redmine', project='' (host via REDMINE_URL, auth via REDMINE_API_KEY env). Idempotent: re-running updates changed issues without duplicating; set reconcile=true to tombstone issues removed upstream.".to_string()), input_schema: tools::source_sync::schema(), ..Default::default() }, diff --git a/crates/vestige-mcp/src/tools/search_unified.rs b/crates/vestige-mcp/src/tools/search_unified.rs index f107a68..9c20def 100644 --- a/crates/vestige-mcp/src/tools/search_unified.rs +++ b/crates/vestige-mcp/src/tools/search_unified.rs @@ -96,6 +96,40 @@ pub fn schema() -> Value { "tag_prefix": { "type": "string", "description": "Optional tag-prefix filter. When set, only results carrying at least one tag whose value starts with this prefix are returned (case-sensitive). Example: tag_prefix=\"meeting:\" matches memories tagged 'meeting:standup', 'meeting:1-on-1', etc. Applied as a post-filter; combine with a larger 'limit' if you expect heavy thinning." + }, + "source_system": { + "type": "string", + "description": "Investigation filter (#57): only memories ingested from this external system, e.g. 'github' or 'redmine'. Post-filter — non-connector memories are excluded. Combine with a larger 'limit' if thinning is heavy." + }, + "source_project": { + "type": "string", + "description": "Investigation filter: only memories from this source project/repo, exact match (GitHub 'owner/repo', Redmine project id)." + }, + "source_id": { + "type": "string", + "description": "Investigation filter: a specific source record id (issue number / ticket id). Pair with source_system to disambiguate across systems." + }, + "source_type": { + "type": "string", + "description": "Investigation filter: source record type, e.g. 'issue', 'comment'." + }, + "source_author": { + "type": "string", + "description": "Investigation filter: the source author/reporter (not assignee)." + }, + "source_updated_after": { + "type": "string", + "description": "Investigation filter: only records whose source was updated at/after this RFC3339 timestamp (inclusive)." + }, + "source_updated_before": { + "type": "string", + "description": "Investigation filter: only records whose source was updated at/before this RFC3339 timestamp (inclusive)." + }, + "source_status": { + "type": "string", + "enum": ["any", "valid", "tombstoned"], + "description": "Investigation filter: 'any' (default), 'valid' (currently-valid records only), or 'tombstoned' (records no longer visible upstream, kept for audit).", + "default": "any" } }, "required": ["query"] @@ -126,6 +160,23 @@ struct SearchArgs { concrete: Option, #[serde(alias = "tag_prefix")] tag_prefix: Option, + // #57 Phase 4 — source-aware investigation filters (all post-filters). + #[serde(alias = "source_system")] + source_system: Option, + #[serde(alias = "source_project")] + source_project: Option, + #[serde(alias = "source_id")] + source_id: Option, + #[serde(alias = "source_type")] + source_type: Option, + #[serde(alias = "source_author")] + source_author: Option, + #[serde(alias = "source_updated_after")] + source_updated_after: Option, + #[serde(alias = "source_updated_before")] + source_updated_before: Option, + #[serde(alias = "source_status")] + source_status: Option, } /// Execute unified search with 7-stage cognitive pipeline. @@ -190,15 +241,19 @@ pub async fn execute( } }; + // #57 Phase 4 — parse the source-aware investigation filter once (shared by + // both the concrete and hybrid paths). Hard-errors on malformed input. + let source_filter = SourceFilter::from_args(&args)?; + let concrete = args .concrete .unwrap_or_else(|| is_literal_query(&args.query)); if concrete { - // When a tag_prefix is requested, fetch a larger pool so the - // post-filter has enough headroom to still return ~limit results - // after thinning. Cap at the same upper bound the underlying SQL - // path uses elsewhere (100). - let concrete_fetch_limit = if args.tag_prefix.is_some() { + // When a tag_prefix OR a source filter is requested, fetch a larger + // pool so the post-filter has enough headroom to still return ~limit + // results after thinning. Cap at the same upper bound the underlying + // SQL path uses elsewhere (100). + let concrete_fetch_limit = if args.tag_prefix.is_some() || source_filter.is_active() { (limit * 3).min(100) } else { limit @@ -215,14 +270,15 @@ pub async fn execute( // Apply tag_prefix post-filter BEFORE strengthen-on-access so // results the caller did not actually receive do not get a // testing-effect boost. - let filtered_results: Vec<&vestige_core::SearchResult> = match args.tag_prefix.as_deref() { - Some(prefix) => results - .iter() - .filter(|r| tags_match_prefix(&r.node.tags, prefix)) - .take(limit as usize) - .collect(), - None => results.iter().collect(), - }; + let filtered_results: Vec<&vestige_core::SearchResult> = results + .iter() + .filter(|r| match args.tag_prefix.as_deref() { + Some(prefix) => tags_match_prefix(&r.node.tags, prefix), + None => true, + }) + .filter(|r| node_matches_source(&r.node, &source_filter)) + .take(limit as usize) + .collect(); let ids: Vec<&str> = filtered_results .iter() @@ -334,11 +390,15 @@ pub async fn execute( "exhaustive" => 5, // Deep overfetch for maximum recall _ => 3, // Balanced default }; - // When a tag_prefix filter is requested, double the overfetch (capped at - // the same 100 ceiling) so the post-filter has enough headroom to still - // return ~limit results after thinning. - let tag_prefix_multiplier = if args.tag_prefix.is_some() { 2 } else { 1 }; - let overfetch_limit = (limit * overfetch_multiplier * tag_prefix_multiplier).min(100); // Cap at 100 to avoid excessive DB load + // When a tag_prefix OR source filter is requested, double the overfetch + // (capped at the same 100 ceiling) so the post-filter has enough headroom + // to still return ~limit results after thinning. + let post_filter_multiplier = if args.tag_prefix.is_some() || source_filter.is_active() { + 2 + } else { + 1 + }; + let overfetch_limit = (limit * overfetch_multiplier * post_filter_multiplier).min(100); // Cap at 100 to avoid excessive DB load let results = storage .hybrid_search_filtered( @@ -375,6 +435,10 @@ pub async fn execute( if let Some(prefix) = args.tag_prefix.as_deref() { filtered_results.retain(|r| tags_match_prefix(&r.node.tags, prefix)); } + // #57 Phase 4 — source-aware investigation post-filter (same precedent). + if source_filter.is_active() { + filtered_results.retain(|r| node_matches_source(&r.node, &source_filter)); + } // ==================================================================== // Dedup: merge Stage 0 keyword-priority results into Stage 1 results @@ -387,6 +451,10 @@ pub async fn execute( { continue; } + // Respect the source filter on re-inject for the same reason. + if source_filter.is_active() && !node_matches_source(&kp.node, &source_filter) { + continue; + } if let Some(existing) = filtered_results .iter_mut() .find(|r| r.node.id == kp.node.id) @@ -852,6 +920,156 @@ fn tags_match_prefix(tags: &[String], prefix: &str) -> bool { tags.iter().any(|t| t.starts_with(prefix)) } +/// Validity filter for source-aware search (#57 Phase 4). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +enum SourceStatus { + /// No validity constraint. + #[default] + Any, + /// Only currently-valid records. + Valid, + /// Only tombstoned records (no longer visible upstream, kept for audit). + Tombstoned, +} + +/// Parsed source-aware investigation filter (#57 Phase 4). +/// +/// All fields are optional; an all-empty filter matches every node (so search +/// behavior is byte-for-byte unchanged when no source filter is supplied). Any +/// source-scoped field being set excludes legacy/agent memories that have no +/// `source_envelope`. Applied as a post-filter on the recalled nodes, mirroring +/// the existing `tag_prefix` precedent (no SQL changes). +#[derive(Debug, Clone, Default)] +struct SourceFilter { + system: Option, + project: Option, + id: Option, + source_type: Option, + author: Option, + updated_after: Option>, + updated_before: Option>, + status: SourceStatus, +} + +impl SourceFilter { + /// Build from raw args, hard-erroring on malformed timestamps / status enum + /// (consistent with how `detail_level` / `retrieval_mode` reject bad input — + /// a silently-`None` bound would widen the filter and return wrong rows). + fn from_args(args: &SearchArgs) -> Result { + let parse_ts = |s: &Option, + field: &str| + -> Result>, String> { + match s { + None => Ok(None), + Some(v) => chrono::DateTime::parse_from_rfc3339(v) + .map(|dt| Some(dt.with_timezone(&chrono::Utc))) + .map_err(|_| format!("Invalid {field}: '{v}' is not an RFC3339 timestamp")), + } + }; + let status = match args.source_status.as_deref() { + None | Some("any") => SourceStatus::Any, + Some("valid") => SourceStatus::Valid, + Some("tombstoned") => SourceStatus::Tombstoned, + Some(other) => { + return Err(format!( + "Invalid source_status '{other}'. Must be 'any', 'valid', or 'tombstoned'." + )); + } + }; + Ok(Self { + system: args.source_system.clone(), + project: args.source_project.clone(), + id: args.source_id.clone(), + source_type: args.source_type.clone(), + author: args.source_author.clone(), + updated_after: parse_ts(&args.source_updated_after, "source_updated_after")?, + updated_before: parse_ts(&args.source_updated_before, "source_updated_before")?, + status, + }) + } + + /// True when at least one filter is set (used to size the over-fetch pool). + fn is_active(&self) -> bool { + self.system.is_some() + || self.project.is_some() + || self.id.is_some() + || self.source_type.is_some() + || self.author.is_some() + || self.updated_after.is_some() + || self.updated_before.is_some() + || self.status != SourceStatus::Any + } +} + +/// Predicate: does this node satisfy the source-aware investigation filter? +/// An all-empty filter returns `true` for every node. +fn node_matches_source(node: &vestige_core::KnowledgeNode, filter: &SourceFilter) -> bool { + // Validity check operates on the NODE (valid_until lives on the node). + match filter.status { + SourceStatus::Any => {} + SourceStatus::Valid if !node.is_currently_valid() => return false, + SourceStatus::Tombstoned if node.is_currently_valid() => return false, + _ => {} + } + + // Any source-scoped field requires an envelope; legacy memories are out. + // This includes `source_status=valid`: otherwise a source-scoped query for + // valid connector records would also return ordinary valid agent memories. + let envelope_scoped = filter.system.is_some() + || filter.project.is_some() + || filter.id.is_some() + || filter.source_type.is_some() + || filter.author.is_some() + || filter.updated_after.is_some() + || filter.updated_before.is_some() + || filter.status != SourceStatus::Any; + if !envelope_scoped { + return true; + } + let Some(env) = node.source_envelope.as_ref() else { + return false; + }; + + let exact = |want: &Option, have: &Option| -> bool { + match want { + None => true, + Some(w) => have.as_deref() == Some(w.as_str()), + } + }; + if !exact(&filter.system, &env.source_system) { + return false; + } + if !exact(&filter.project, &env.source_project) { + return false; + } + if !exact(&filter.id, &env.source_id) { + return false; + } + if !exact(&filter.source_type, &env.source_type) { + return false; + } + if !exact(&filter.author, &env.source_author) { + return false; + } + // Date bounds (inclusive) on the source-updated time. + if filter.updated_after.is_some() || filter.updated_before.is_some() { + let Some(ts) = env.source_updated_at else { + return false; + }; + if let Some(after) = filter.updated_after + && ts < after + { + return false; + } + if let Some(before) = filter.updated_before + && ts > before + { + return false; + } + } + true +} + /// Format a search result based on the requested detail level. /// Score field keys dropped when an output profile suppresses scores. const SCORE_FIELDS: &[&str] = &["combinedScore", "keywordScore", "semanticScore"]; @@ -1880,6 +2098,167 @@ mod tests { assert!(!required.contains(&serde_json::json!("tag_prefix"))); } + // ===================== #57 Phase 4 source filters ===================== + + /// Build a KnowledgeNode carrying a source envelope for filter tests. + fn node_with_source( + system: &str, + project: &str, + id: &str, + author: &str, + updated: &str, + ) -> vestige_core::KnowledgeNode { + let mut n = vestige_core::KnowledgeNode::default(); + n.id = format!("{system}-{id}"); + // SourceEnvelope is #[non_exhaustive]; build via Default + field set. + let mut env = vestige_core::SourceEnvelope::default(); + env.source_system = Some(system.to_string()); + env.source_id = Some(id.to_string()); + env.source_url = Some(format!("https://x/{id}")); + env.source_updated_at = chrono::DateTime::parse_from_rfc3339(updated) + .ok() + .map(|d| d.with_timezone(&chrono::Utc)); + env.content_hash = Some("h".to_string()); + env.source_project = Some(project.to_string()); + env.source_type = Some("issue".to_string()); + env.source_author = Some(author.to_string()); + n.source_envelope = Some(env); + n + } + + fn filter_from(json: serde_json::Value) -> SourceFilter { + let mut v = json; + v["query"] = serde_json::json!("q"); + let args: SearchArgs = serde_json::from_value(v).unwrap(); + SourceFilter::from_args(&args).unwrap() + } + + #[test] + fn source_filter_empty_matches_everything() { + let f = SourceFilter::default(); + assert!(!f.is_active()); + let gh = node_with_source("github", "o/r", "1", "octo", "2026-06-19T00:00:00Z"); + let legacy = vestige_core::KnowledgeNode::default(); // no envelope + assert!(node_matches_source(&gh, &f)); + assert!(node_matches_source(&legacy, &f), "no filter = unchanged"); + } + + #[test] + fn source_filter_exact_fields() { + let gh = node_with_source("github", "o/r", "57", "octo", "2026-06-19T00:00:00Z"); + let rm = node_with_source("redmine", "infra", "57", "jane", "2026-06-19T00:00:00Z"); + + let by_system = filter_from(serde_json::json!({"sourceSystem": "github"})); + assert!(node_matches_source(&gh, &by_system)); + assert!(!node_matches_source(&rm, &by_system)); + + let by_project = filter_from(serde_json::json!({"sourceProject": "infra"})); + assert!(node_matches_source(&rm, &by_project)); + assert!(!node_matches_source(&gh, &by_project)); + + let by_author = filter_from(serde_json::json!({"sourceAuthor": "octo"})); + assert!(node_matches_source(&gh, &by_author)); + assert!(!node_matches_source(&rm, &by_author)); + + // id + system together disambiguate across systems sharing an id. + let by_id_sys = + filter_from(serde_json::json!({"sourceSystem": "redmine", "sourceId": "57"})); + assert!(node_matches_source(&rm, &by_id_sys)); + assert!(!node_matches_source(&gh, &by_id_sys)); + } + + #[test] + fn source_filter_excludes_legacy_memories_when_envelope_scoped() { + let legacy = vestige_core::KnowledgeNode::default(); + let f = filter_from(serde_json::json!({"sourceSystem": "github"})); + assert!( + !node_matches_source(&legacy, &f), + "an envelope-scoped filter must exclude memories with no source" + ); + } + + #[test] + fn source_filter_date_bounds_inclusive() { + let n = node_with_source("github", "o/r", "1", "octo", "2026-06-15T12:00:00Z"); + // After bound: inclusive at the exact instant, excludes earlier. + assert!(node_matches_source( + &n, + &filter_from(serde_json::json!({"sourceUpdatedAfter": "2026-06-15T12:00:00Z"})) + )); + assert!(!node_matches_source( + &n, + &filter_from(serde_json::json!({"sourceUpdatedAfter": "2026-06-16T00:00:00Z"})) + )); + // Before bound: inclusive, excludes later. + assert!(node_matches_source( + &n, + &filter_from(serde_json::json!({"sourceUpdatedBefore": "2026-06-15T12:00:00Z"})) + )); + assert!(!node_matches_source( + &n, + &filter_from(serde_json::json!({"sourceUpdatedBefore": "2026-06-15T00:00:00Z"})) + )); + } + + #[test] + fn source_filter_status_valid_vs_tombstoned() { + let mut live = node_with_source("github", "o/r", "1", "octo", "2026-06-19T00:00:00Z"); + let mut dead = node_with_source("github", "o/r", "2", "octo", "2026-06-19T00:00:00Z"); + let legacy = vestige_core::KnowledgeNode::default(); + // Tombstone `dead` by setting valid_until in the past. + dead.valid_until = Some(chrono::Utc::now() - chrono::Duration::days(1)); + live.valid_until = None; + + let valid = filter_from(serde_json::json!({"sourceStatus": "valid"})); + assert!(node_matches_source(&live, &valid)); + assert!(!node_matches_source(&dead, &valid)); + assert!( + !node_matches_source(&legacy, &valid), + "source_status is source-scoped and must not include legacy memories" + ); + + let tomb = filter_from(serde_json::json!({"sourceStatus": "tombstoned"})); + assert!(!node_matches_source(&live, &tomb)); + assert!(node_matches_source(&dead, &tomb)); + assert!(!node_matches_source(&legacy, &tomb)); + } + + #[test] + fn source_filter_rejects_bad_timestamp_and_status() { + let mut v = serde_json::json!({"query": "q", "sourceUpdatedAfter": "not-a-date"}); + let args: SearchArgs = serde_json::from_value(v.take()).unwrap(); + assert!(SourceFilter::from_args(&args).is_err()); + + let mut v2 = serde_json::json!({"query": "q", "sourceStatus": "bogus"}); + let args2: SearchArgs = serde_json::from_value(v2.take()).unwrap(); + assert!(SourceFilter::from_args(&args2).is_err()); + } + + #[test] + fn test_schema_has_source_filters() { + let s = schema(); + for prop in [ + "source_system", + "source_project", + "source_id", + "source_type", + "source_author", + "source_updated_after", + "source_updated_before", + "source_status", + ] { + assert!( + s["properties"][prop].is_object(), + "schema must expose {prop}" + ); + } + // None of the source filters are required. + let required = s["required"].as_array().unwrap(); + for prop in ["source_system", "source_status"] { + assert!(!required.contains(&serde_json::json!(prop))); + } + } + /// Helper that ingests a memory with specific tags. The base /// `ingest_test_content` helper passes `tags: vec![]`, which is fine /// for legacy tests but not for tag_prefix coverage. diff --git a/crates/vestige-mcp/src/tools/source_sync.rs b/crates/vestige-mcp/src/tools/source_sync.rs index bfdd4b2..a6c0232 100644 --- a/crates/vestige-mcp/src/tools/source_sync.rs +++ b/crates/vestige-mcp/src/tools/source_sync.rs @@ -1,11 +1,11 @@ //! `source_sync` MCP tool (#57) — index an external system into Vestige. //! //! Turns Vestige into a durable, offline, provenance-linked retrieval layer -//! over a long-lived external system. The first connector is GitHub Issues: -//! point it at `owner/repo` and Vestige indexes every issue + its comments as -//! source-aware memories you can search semantically and cite back to the -//! canonical issue URL — re-runnable idempotently (no duplicates) and able to -//! tombstone issues that vanish upstream. +//! over a long-lived external system. GitHub Issues and Redmine are the first +//! reference connectors: Vestige indexes issues, comments/journals, and source +//! metadata as source-aware memories you can search semantically and cite back +//! to the canonical issue URL — re-runnable idempotently (no duplicates) and +//! able to tombstone issues that vanish upstream. //! //! Unlike the official GitHub MCP server (a stateless live API proxy), this //! keeps a local index: searchable offline, embedded for semantic recall, @@ -13,10 +13,11 @@ //! //! ## Auth (security) //! -//! The GitHub token is read from the `GITHUB_TOKEN` (or `VESTIGE_GITHUB_TOKEN`) -//! environment variable, never from tool arguments, so credentials are not -//! logged in the conversation. Public repositories work without a token at a -//! lower rate limit. +//! Tokens are read from environment variables (`GITHUB_TOKEN` / +//! `VESTIGE_GITHUB_TOKEN`, `REDMINE_API_KEY` / `VESTIGE_REDMINE_API_KEY`) and +//! never from tool arguments, so credentials are not logged in the conversation. +//! Public GitHub repositories and anonymous Redmine instances can work without a +//! token/key at lower capability. use std::sync::Arc; @@ -32,13 +33,17 @@ pub fn schema() -> Value { "properties": { "source": { "type": "string", - "enum": ["github"], - "description": "External system to sync. Currently: 'github' (GitHub Issues).", + "enum": ["github", "redmine"], + "description": "External system to sync: 'github' (GitHub Issues) or 'redmine' (a Redmine project).", "default": "github" }, "repo": { "type": "string", - "description": "GitHub repository as 'owner/name', e.g. 'samvallad33/vestige'." + "description": "GitHub only: repository as 'owner/name', e.g. 'samvallad33/vestige'." + }, + "project": { + "type": "string", + "description": "Redmine only: project identifier (slug or numeric id) to sync. The Redmine host comes from the REDMINE_URL env var." }, "reconcile": { "type": "boolean", @@ -47,13 +52,13 @@ pub fn schema() -> Value { }, "max_pages": { "type": "integer", - "description": "Max API pages to fetch this run (each page is up to 100 issues). Lets a first sync of a large repo be resumed across calls. Default 10.", + "description": "Max API pages to fetch this run (each page is up to 100 issues). Lets a first sync of a large project be resumed across calls. Default 10.", "default": 10, "minimum": 1, "maximum": 1000 } }, - "required": ["repo"] + "required": [] }) } @@ -62,7 +67,10 @@ pub fn schema() -> Value { struct SourceSyncArgs { #[serde(default = "default_source")] source: String, - repo: String, + #[serde(default)] + repo: Option, + #[serde(default)] + project: Option, #[serde(default)] reconcile: bool, #[serde(default, alias = "max_pages")] @@ -81,35 +89,60 @@ fn github_token() -> Option { .filter(|s| !s.trim().is_empty()) } +/// Read the Redmine API key from the environment (never from tool args). +fn redmine_api_key() -> Option { + std::env::var("REDMINE_API_KEY") + .or_else(|_| std::env::var("VESTIGE_REDMINE_API_KEY")) + .ok() + .filter(|s| !s.trim().is_empty()) +} + +/// Read the Redmine base URL from the environment. +fn redmine_url() -> Option { + std::env::var("REDMINE_URL") + .or_else(|_| std::env::var("VESTIGE_REDMINE_URL")) + .ok() + .filter(|s| !s.trim().is_empty()) +} + pub async fn execute(storage: &Arc, args: Option) -> Result { let args: SourceSyncArgs = match args { Some(v) => serde_json::from_value(v).map_err(|e| format!("Invalid arguments: {e}"))?, None => return Err("Missing arguments".to_string()), }; - if args.source != "github" { - return Err(format!( - "Unsupported source '{}'. Currently only 'github' is supported.", - args.source - )); + let max_pages = args.max_pages.unwrap_or(10); + + match args.source.as_str() { + "github" => { + let repo = args + .repo + .as_deref() + .ok_or_else(|| "github requires a 'repo' ('owner/name')".to_string())?; + let (owner, repo) = repo + .split_once('/') + .filter(|(o, r)| !o.is_empty() && !r.is_empty()) + .ok_or_else(|| { + "repo must be in 'owner/name' form, e.g. 'samvallad33/vestige'".to_string() + })?; + execute_github(storage, owner, repo, args.reconcile, max_pages).await + } + "redmine" => { + let project = args + .project + .as_deref() + .filter(|p| !p.trim().is_empty()) + .ok_or_else(|| "redmine requires a 'project' identifier".to_string())?; + let base_url = redmine_url().ok_or_else(|| { + "set the REDMINE_URL env var to the Redmine host (e.g. https://redmine.example.com)" + .to_string() + })?; + execute_redmine(storage, &base_url, project, args.reconcile, max_pages).await + } + other => Err(format!( + "Unsupported source '{other}'. Supported: 'github', 'redmine'." + )), } - - let (owner, repo) = args - .repo - .split_once('/') - .filter(|(o, r)| !o.is_empty() && !r.is_empty()) - .ok_or_else(|| { - "repo must be in 'owner/name' form, e.g. 'samvallad33/vestige'".to_string() - })?; - - execute_github( - storage, - owner, - repo, - args.reconcile, - args.max_pages.unwrap_or(10), - ) - .await } /// Connectors are feature-gated; surface a clear message when the build omits @@ -122,11 +155,24 @@ async fn execute_github( _reconcile: bool, _max_pages: usize, ) -> Result { - Err("This Vestige build was compiled without the 'connectors' feature. \ - Rebuild with --features connectors to enable source_sync." - .to_string()) + Err(NO_CONNECTORS_MSG.to_string()) } +#[cfg(not(feature = "connectors"))] +async fn execute_redmine( + _storage: &Arc, + _base_url: &str, + _project: &str, + _reconcile: bool, + _max_pages: usize, +) -> Result { + Err(NO_CONNECTORS_MSG.to_string()) +} + +#[cfg(not(feature = "connectors"))] +const NO_CONNECTORS_MSG: &str = "This Vestige build was compiled without the 'connectors' feature. \ + Rebuild with --features connectors to enable source_sync."; + #[cfg(feature = "connectors")] async fn execute_github( storage: &Arc, @@ -185,3 +231,61 @@ async fn execute_github( } })) } + +#[cfg(feature = "connectors")] +async fn execute_redmine( + storage: &Arc, + base_url: &str, + project: &str, + reconcile: bool, + max_pages: usize, +) -> Result { + use vestige_core::connectors::redmine::{RedmineConfig, RedmineConnector}; + use vestige_core::connectors::run_sync; + + let config = RedmineConfig::new(base_url, project).with_api_key(redmine_api_key()); + let connector = + RedmineConnector::new(config).map_err(|e| format!("connector init failed: {e}"))?; + + let report = run_sync(storage.as_ref(), &connector, reconcile, max_pages) + .await + .map_err(|e| format!("sync failed: {e}"))?; + + let total = report.created + report.updated + report.unchanged; + let authed = redmine_api_key().is_some(); + + let summary = format!( + "Synced redmine project '{project}': {} created, {} updated, {} unchanged{} ({total} records seen{}).", + report.created, + report.updated, + report.unchanged, + if report.reconciled { + format!(", {} tombstoned", report.tombstoned) + } else { + String::new() + }, + if authed { "" } else { ", anonymous" }, + ); + + Ok(json!({ + "ok": true, + "summary": summary, + "source": "redmine", + "scope": project, + "created": report.created, + "updated": report.updated, + "unchanged": report.unchanged, + "tombstoned": report.tombstoned, + "reconciled": report.reconciled, + "cursor": report.new_cursor.map(|d| d.to_rfc3339()), + "authenticated": authed, + "warnings": report.warnings, + "hint": if total == 0 && !authed { + "No records returned. Set REDMINE_API_KEY (and confirm the REST API is enabled on the instance) for private projects." + } else if report.new_cursor.is_some() && total >= 100 { + "More may remain — run source_sync again to continue from the saved cursor." + } else { + "Search these with the normal search tools; results cite the Redmine issue URL." + } + })) +} diff --git a/docs/CONNECTORS.md b/docs/CONNECTORS.md index 8bd561a..a324784 100644 --- a/docs/CONNECTORS.md +++ b/docs/CONNECTORS.md @@ -1,7 +1,7 @@ # External-Source Connectors -> Status: **v2.1.27** — GitHub Issues connector (reference). Redmine and others -> follow the same contract. Tracking issue: +> Status: **v2.1.27** — GitHub Issues + Redmine reference connectors, plus +> source-aware investigation filters for search. Tracking issue: > [#57](https://github.com/samvallad33/vestige/issues/57). Connectors let Vestige act as a durable, local **retrieval and reasoning layer** @@ -59,18 +59,63 @@ you can: } ``` +## Quick start (Redmine) + +Redmine stays the system of record; Vestige indexes a project's issues + +journals (comments and status/assignment history). + +1. Point Vestige at the Redmine host and key (env only, never tool args): + + ```sh + export REDMINE_URL=https://redmine.example.com + export REDMINE_API_KEY=xxxxxxxx # or VESTIGE_REDMINE_API_KEY + ``` + + The instance must have the REST API enabled (Administration → Settings → API) + or every call returns 401/403 even with a valid key. + +2. Run `source_sync`: + + ```json + { "source": "redmine", "project": "infra" } + ``` + + Results cite the canonical `https://redmine.example.com/issues/` URL. + ## The `source_sync` tool | Field | Type | Default | Meaning | |---|---|---|---| -| `repo` | string | — (required) | `owner/name`, e.g. `samvallad33/vestige`. | -| `source` | string | `github` | External system. Currently only `github`. | +| `source` | string | `github` | `github` or `redmine`. | +| `repo` | string | — | **GitHub:** `owner/name`, e.g. `samvallad33/vestige`. | +| `project` | string | — | **Redmine:** project identifier (host from `REDMINE_URL`). | | `reconcile` | bool | `false` | Also tombstone local memories for issues no longer visible upstream (an extra full-enumeration pass). | -| `max_pages` | int | `10` | API pages to fetch this run (≤100 issues each). Lets a first sync of a large repo resume across calls. | +| `max_pages` | int | `10` | API pages to fetch this run (≤100 issues each). Lets a first sync of a large project resume across calls. | The tool returns counts (`created` / `updated` / `unchanged` / `tombstoned`), the saved `cursor`, whether it ran authenticated, and a `hint` for the next step. +## Investigation filters (Phase 4) + +`search` accepts source-aware filters so an agent can scope a query to indexed +records. All are optional post-filters; combine with a larger `limit` if you +expect heavy thinning. A source-scoped query excludes non-connector memories. + +| Filter | Matches | +|---|---| +| `source_system` | `github`, `redmine`, … | +| `source_project` | repo / project (exact) | +| `source_id` | a specific issue/ticket id | +| `source_type` | `issue`, `comment`, … | +| `source_author` | reporter/author (not assignee) | +| `source_updated_after` / `source_updated_before` | RFC3339 date range (inclusive) | +| `source_status` | `valid` (default `any`) or `tombstoned` | + +Status, tracker, and priority are filterable through the existing `tag_prefix` +(the connectors emit lowercase `status:`, `tracker:`, `priority:`, and GitHub +`label:` / `state:` tags) — e.g. `tag_prefix: "status:open"`. Assignee and +linked-issue graph traversal are not yet exposed (see below). + ### Idempotent, incremental sync Each run: @@ -144,7 +189,18 @@ cargo build -p vestige-core --features connectors Implement the `Connector` trait in `vestige_core::connectors` (fetch a window of records updated since a cursor, page forward, and optionally enumerate live ids for reconciliation), produce `NormalizedRecord`s with a filled -`SourceEnvelope`, and hand them to `run_sync`. The GitHub connector -(`crates/vestige-core/src/connectors/github.rs`) is the reference -implementation. The sync driver, idempotent upsert, cursor checkpointing, and -tombstone reconciliation are all reused for free. +`SourceEnvelope`, and hand them to `run_sync`. Two reference connectors show the +shape — `crates/vestige-core/src/connectors/github.rs` (Link-header pagination, +opaque-url cursor) and `crates/vestige-core/src/connectors/redmine.rs` +(offset pagination, two-phase list-then-detail fetch). The sync driver, +idempotent upsert, cursor checkpointing, and tombstone reconciliation are all +reused for free. + +## Not yet supported + +- **Assignee filter** — the envelope stores `source_author` (reporter) only; no + assignee column yet. +- **Tracker / version dedicated filter params** — reachable today via + `tag_prefix` (`tracker:`, and `version:`/`category:` when emitted). +- **Linked-issue graph traversal** — connectors import relations into the memory + body, but issue-to-issue graph edges are not yet exposed in search.