diff --git a/CHANGELOG.md b/CHANGELOG.md index 5edf533..2918af4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,63 @@ All notable changes to Vestige will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [Unreleased] — "External-Source Connectors" + +> Bump `version` in the workspace `Cargo.toml`, both crates, `server.json`, and +> `package.json` to `2.1.27` at release/tag time, and date this heading. + +Roadmap [#57](https://github.com/samvallad33/vestige/issues/57), **Phase 1–3**: +Vestige can now act as a durable, local, semantically-searchable retrieval layer +over an external system of record — starting with GitHub Issues — without +replacing it. The external system stays canonical; Vestige **indexes, connects, +retrieves, and cites back** to the source record. + +Unlike a live ticket-system MCP proxy (which holds no state and is rate-limited +per query), Vestige keeps a durable embedded index: searchable **offline**, +**semantically**, joinable with the rest of your memory, temporally versioned, +and re-syncable **idempotently** with no duplication. To our knowledge no other +local-first memory layer combines native connectors, external-URL provenance, +content-hash idempotent sync, and tombstoning of vanished records. + +### Added + +- **`source_sync` MCP tool** — point Vestige at a GitHub repo + (`{"repo": "owner/name"}`) and it indexes every issue + its comments as + source-aware memories. Re-running updates changed issues in place (no + duplicates); `reconcile: true` tombstones issues no longer visible upstream. + Auth via the `GITHUB_TOKEN` (or `VESTIGE_GITHUB_TOKEN`) environment variable; + public repos work without a token at a lower rate limit. +- **Source envelope** on every memory — structured, machine-readable provenance + (`source_system`, `source_id`, `source_url`, `source_updated_at`, + `content_hash`, `synced_at`, `source_project`, `source_type`, `source_author`) + distinct from the legacy free-form `source` label. Search results gain a + `sourceRecord` object (with the canonical `url`) **only** for + connector-ingested memories, so an agent can cite and follow the source. +- **Idempotent sync primitives** (`vestige-core`): `upsert_by_source` (keyed on + `(source_system, source_id)`, content-hash change detection), per-connector + cursor checkpoints (`connector_cursors`), and `reconcile_source_tombstones` + (invalidate-don't-delete via the bitemporal `valid_until`, so a vanished + record is retained for audit but drops out of current retrieval). +- **Connector contract** (`vestige_core::connectors`) — a small source-agnostic + `Connector` trait + `run_sync` driver (cursor overlap window, incremental + paging, optional deletion reconcile) and a GitHub Issues reference connector + behind the optional `connectors` cargo feature (on by default in the MCP + server, off in the core library's default features so non-connector consumers + link no HTTP client). + +### Database + +- **Migration V17** — nine nullable source-envelope columns on `knowledge_nodes` + (additive; every existing memory is untouched), a partial UNIQUE index on + `(source_system, source_id)` enforcing one memory per external record while + costing nothing for envelope-less legacy rows, and the `connector_cursors` + checkpoint table. Idempotent on replay, following the established + `add_column_if_missing` pattern. + +### Notes + +- Local-first and optional: with no `source_sync` call, behavior is unchanged. + The default core-library build does not link an HTTP client. ## [2.1.26] - 2026-06-15 — "Configurable Output" diff --git a/Cargo.lock b/Cargo.lock index 20e3853..2a88b74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -521,6 +521,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.44" @@ -1681,8 +1687,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1692,9 +1700,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi 5.3.0", "wasip2", + "wasm-bindgen", ] [[package]] @@ -1932,6 +1942,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots 1.0.6", ] [[package]] @@ -2519,6 +2530,12 @@ dependencies = [ "hashbrown 0.16.1", ] +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "lzma-rust2" version = "0.15.7" @@ -3335,6 +3352,61 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror 2.0.18", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.18", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.45" @@ -3571,6 +3643,8 @@ dependencies = [ "native-tls", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", "rustls-pki-types", "serde", "serde_json", @@ -3578,6 +3652,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", + "tokio-rustls", "tokio-util", "tower", "tower-http", @@ -3587,6 +3662,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots 1.0.6", ] [[package]] @@ -3636,6 +3712,12 @@ dependencies = [ "sqlite-wasm-rs", ] +[[package]] +name = "rustc-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" + [[package]] name = "rustix" version = "1.1.4" @@ -3670,6 +3752,7 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" dependencies = [ + "web-time", "zeroize", ] @@ -4162,6 +4245,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokenizers" version = "0.22.2" @@ -4684,6 +4782,7 @@ dependencies = [ "git2", "lru", "notify", + "reqwest", "rusqlite", "serde", "serde_json", diff --git a/crates/vestige-core/Cargo.toml b/crates/vestige-core/Cargo.toml index 0b8cb25..96a71b3 100644 --- a/crates/vestige-core/Cargo.toml +++ b/crates/vestige-core/Cargo.toml @@ -57,6 +57,12 @@ qwen3-embeddings = ["embeddings", "fastembed/qwen3", "dep:candle-core"] # Backwards-compatible feature alias from the original v2.1.0 naming. qwen3-reranker = ["qwen3-embeddings"] +# External-source connectors (#57). The connector *contract*, normalization, +# and content-hashing are always compiled (pure, no network). This feature adds +# the network-backed reference connectors (GitHub Issues, …) via `reqwest`, so +# the default local-first build never links an HTTP client. +connectors = ["dep:reqwest"] + # Metal GPU acceleration on Apple Silicon (significantly faster inference) metal = ["fastembed/metal"] @@ -132,6 +138,14 @@ lru = "0.16" trait-variant = "0.1" blake3 = "1" +# ============================================================================ +# OPTIONAL: External-source connectors (#57) +# ============================================================================ +# HTTP client for network-backed reference connectors (GitHub Issues, Redmine). +# rustls so connectors build with no system OpenSSL dependency. Behind the +# `connectors` feature — the default local-first build does not link reqwest. +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"], optional = true } + [dev-dependencies] tempfile = "3" criterion = { version = "0.5", features = ["html_reports"] } diff --git a/crates/vestige-core/src/connectors/github.rs b/crates/vestige-core/src/connectors/github.rs new file mode 100644 index 0000000..a373dcf --- /dev/null +++ b/crates/vestige-core/src/connectors/github.rs @@ -0,0 +1,570 @@ +//! GitHub Issues connector (#57). +//! +//! Indexes a repository's issues + comments into source-aware Vestige memories +//! so an agent can search and reason over the full issue history **offline**, +//! **semantically**, and **cited back to the canonical issue URL**. Unlike the +//! official GitHub MCP server — a stateless live API proxy — this builds a +//! durable, embedded, temporally-versioned local index. +//! +//! ## Incremental sync (per the connector sync contract) +//! +//! - `state=all` so closing an issue is not mistaken for a deletion. +//! - `sort=updated&direction=asc` so we page forward in cursor order and a +//! mid-run interruption resumes safely. +//! - `since=` filters on `updated_at`; the overlap + the +//! `content_hash` no-op makes re-scans safe and cheap. +//! - `Link: rel="next"` drives pagination (never hand-built page urls). +//! - Entries carrying a `pull_request` key are dropped (PRs are not issues). +//! - Per issue we fold the body + comments into one memory; the hash covers +//! the stable fields only (title, body, state, labels, comments) — never the +//! cursor timestamp or volatile counts. +//! +//! GitHub has no deletion feed, so deletions are reconciled out-of-band via +//! [`list_live_ids`](Connector::list_live_ids). + +use chrono::{DateTime, Utc}; +use serde::Deserialize; + +use super::{ + Connector, ConnectorError, ConnectorResult, FetchPage, NormalizedRecord, content_hash, +}; +use crate::memory::SourceEnvelope; + +const API_ROOT: &str = "https://api.github.com"; +const USER_AGENT: &str = concat!("vestige-connector/", env!("CARGO_PKG_VERSION")); +const PER_PAGE: u32 = 100; + +/// Configuration for a GitHub Issues connector instance. +#[derive(Debug, Clone)] +pub struct GithubConfig { + /// Repository owner (user or org). + pub owner: String, + /// Repository name. + pub repo: String, + /// Personal access token. Optional for public repos (60 req/hr + /// unauthenticated) but strongly recommended (5000 req/hr authenticated). + pub token: Option, + /// Override the API root (for GitHub Enterprise or tests). + pub api_root: Option, + /// Max comments to fold into one issue memory (defense against huge threads). + pub max_comments: usize, +} + +impl GithubConfig { + pub fn new(owner: impl Into, repo: impl Into) -> Self { + Self { + owner: owner.into(), + repo: repo.into(), + token: None, + api_root: None, + max_comments: 50, + } + } + + pub fn with_token(mut self, token: Option) -> Self { + self.token = token; + self + } + + fn scope(&self) -> String { + format!("{}/{}", self.owner, self.repo) + } + + fn root(&self) -> &str { + self.api_root.as_deref().unwrap_or(API_ROOT) + } +} + +/// A GitHub Issues connector bound to one repository. +pub struct GithubConnector { + config: GithubConfig, + scope: String, + client: reqwest::Client, +} + +impl GithubConnector { + pub fn new(config: GithubConfig) -> ConnectorResult { + if config.owner.is_empty() || config.repo.is_empty() { + return Err(ConnectorError::Config( + "owner and repo are required".to_string(), + )); + } + let client = reqwest::Client::builder() + .user_agent(USER_AGENT) + .build() + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + let scope = config.scope(); + Ok(Self { + config, + scope, + client, + }) + } + + fn auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder { + let req = req + .header("Accept", "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28"); + match &self.config.token { + Some(t) => req.bearer_auth(t), + None => req, + } + } + + /// Map an HTTP response status into a connector error, honoring rate-limit + /// signals so the driver can back off politely. + fn classify_status(resp: &reqwest::Response) -> Option { + let status = resp.status(); + if status.is_success() { + return None; + } + // Primary rate limit: 403/429 with remaining=0. + if status.as_u16() == 403 || status.as_u16() == 429 { + let remaining = resp + .headers() + .get("x-ratelimit-remaining") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()); + if remaining == Some(0) || status.as_u16() == 429 { + let retry = resp + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .map(std::time::Duration::from_secs); + return Some(ConnectorError::RateLimited(retry)); + } + } + Some(ConnectorError::Source { + status: status.as_u16(), + message: status + .canonical_reason() + .unwrap_or("request failed") + .to_string(), + }) + } + + /// Parse the `Link` header for the `rel="next"` url, if any. + /// + /// The `next` url comes from the server response, so we pin it to the + /// configured API host before following it: otherwise a malicious or + /// compromised endpoint could redirect the connector — which attaches the + /// bearer token to every request — to an attacker-controlled URL and + /// exfiltrate the credential (SSRF / token leak). `expected_host` is the + /// host of the connector's API root. + fn next_link(resp: &reqwest::Response, expected_host: Option<&str>) -> Option { + let link = resp.headers().get(reqwest::header::LINK)?.to_str().ok()?; + for part in link.split(',') { + let part = part.trim(); + if part.contains("rel=\"next\"") + && let (Some(start), Some(end)) = (part.find('<'), part.find('>')) + && start < end + { + let url = &part[start + 1..end]; + // Host-pin: only follow a next-url on the same host as the API + // root we were configured with. + if let Some(expected) = expected_host { + match reqwest::Url::parse(url) { + Ok(parsed) if parsed.host_str() == Some(expected) => { + return Some(url.to_string()); + } + _ => { + tracing::warn!( + next_url = url, + "dropping cross-host Link next url (host pin)" + ); + return None; + } + } + } + return Some(url.to_string()); + } + } + None + } + + /// Host of the configured API root, used to pin Link `next` urls. + fn api_host(&self) -> Option { + reqwest::Url::parse(self.config.root()) + .ok() + .and_then(|u| u.host_str().map(|h| h.to_string())) + } + + /// Fetch the comments for one issue (a single page; capped by `max_comments`). + async fn fetch_comments(&self, issue_number: u64) -> ConnectorResult> { + let url = format!( + "{}/repos/{}/{}/issues/{}/comments?per_page={}", + self.config.root(), + self.config.owner, + self.config.repo, + issue_number, + self.config.max_comments.min(100), + ); + let resp = self + .auth(self.client.get(&url)) + .send() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + if let Some(err) = Self::classify_status(&resp) { + return Err(err); + } + resp.json::>() + .await + .map_err(|e| ConnectorError::Transport(e.to_string())) + } + + /// Fold a raw issue + its comments into one normalized memory record. + fn normalize(&self, issue: &RawIssue, comments: &[RawComment]) -> NormalizedRecord { + let author = issue.user.as_ref().map(|u| u.login.clone()); + + // Human-readable content: header + body + chronological comments. + let mut content = format!( + "[{}#{}] {}\nState: {}\n", + self.scope, issue.number, issue.title, issue.state + ); + if let Some(body) = &issue.body + && !body.trim().is_empty() + { + content.push('\n'); + content.push_str(body.trim()); + content.push('\n'); + } + let mut sorted_comments: Vec<&RawComment> = comments.iter().collect(); + sorted_comments.sort_by_key(|c| c.id); + for c in &sorted_comments { + let who = c.user.as_ref().map(|u| u.login.as_str()).unwrap_or("?"); + content.push_str(&format!("\n— {who}: {}", c.body.trim())); + } + + // Labels, sorted for a stable hash. + let mut labels: Vec = issue.labels.iter().map(|l| l.name.clone()).collect(); + labels.sort(); + + // Stable content hash — meaning only, never the cursor timestamp or + // volatile counts. Comments contribute their id+body in id order. + let comments_blob = sorted_comments + .iter() + .map(|c| format!("{}:{}", c.id, c.body.trim())) + .collect::>() + .join("\u{1f}"); + let labels_blob = labels.join(","); + let number_str = issue.number.to_string(); + let body_str = issue.body.clone().unwrap_or_default(); + let hash = content_hash(&[ + ("number", &number_str), + ("title", &issue.title), + ("state", &issue.state), + ("body", &body_str), + ("labels", &labels_blob), + ("comments", &comments_blob), + ]); + + let mut tags = vec![ + "github".to_string(), + "issue".to_string(), + format!("state:{}", issue.state), + ]; + tags.extend(labels.into_iter().map(|l| format!("label:{l}"))); + + let envelope = SourceEnvelope { + source_system: Some("github".to_string()), + source_id: Some(issue.number.to_string()), + source_url: Some(issue.html_url.clone()), + source_updated_at: DateTime::parse_from_rfc3339(&issue.updated_at) + .ok() + .map(|d| d.with_timezone(&Utc)), + content_hash: Some(hash), + synced_at: Some(Utc::now()), + source_project: Some(self.scope.clone()), + source_type: Some("issue".to_string()), + source_author: author, + }; + + NormalizedRecord { + content, + tags, + envelope, + } + } +} + +impl Connector for GithubConnector { + fn source_system(&self) -> &str { + "github" + } + + fn scope(&self) -> &str { + &self.scope + } + + async fn fetch_updated( + &self, + since: Option>, + cursor: Option, + ) -> ConnectorResult { + // `cursor` is a full next-page url from a previous Link header; on the + // first page we build the url from owner/repo + since. + let url = match cursor { + Some(u) => u, + None => { + let mut u = format!( + "{}/repos/{}/{}/issues?state=all&sort=updated&direction=asc&per_page={}", + self.config.root(), + self.config.owner, + self.config.repo, + PER_PAGE, + ); + if let Some(s) = since { + // GitHub documents the `since` format as YYYY-MM-DDTHH:MM:SSZ. + // `to_rfc3339()` emits the `+00:00` offset form, and the `+` + // is a reserved query char that the server decodes as a + // space — corrupting the timestamp and silently re-fetching + // all history every run. Emit the `Z` form (no reserved + // char, exact documented format) instead. + let since_z = s.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + u.push_str(&format!("&since={since_z}")); + } + u + } + }; + + let resp = self + .auth(self.client.get(&url)) + .send() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + if let Some(err) = Self::classify_status(&resp) { + return Err(err); + } + let next_cursor = Self::next_link(&resp, self.api_host().as_deref()); + let issues: Vec = resp + .json() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + + let mut records = Vec::new(); + for issue in &issues { + // Drop pull requests — "every PR is an issue, but not vice versa". + if issue.pull_request.is_some() { + continue; + } + // Fetch comments only when the issue has any. + let comments = if issue.comments > 0 { + self.fetch_comments(issue.number).await.unwrap_or_default() + } else { + Vec::new() + }; + records.push(self.normalize(issue, &comments)); + } + + Ok(FetchPage { + records, + next_cursor, + }) + } + + async fn list_live_ids(&self) -> ConnectorResult>> { + // Enumerate all issue numbers (ids only) for the reconcile pass, paging + // via Link. Cheap relative to full sync (no comment fetch, no bodies). + let mut ids = Vec::new(); + let mut url = Some(format!( + "{}/repos/{}/{}/issues?state=all&per_page={}", + self.config.root(), + self.config.owner, + self.config.repo, + PER_PAGE, + )); + while let Some(u) = url { + let resp = self + .auth(self.client.get(&u)) + .send() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + if let Some(err) = Self::classify_status(&resp) { + return Err(err); + } + let next = Self::next_link(&resp, self.api_host().as_deref()); + let issues: Vec = resp + .json() + .await + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + for issue in issues { + if issue.pull_request.is_none() { + ids.push(issue.number.to_string()); + } + } + url = next; + } + Ok(Some(ids)) + } +} + +// --------------------------------------------------------------------------- +// Raw GitHub API shapes (only the fields we use) +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +struct RawIssue { + number: u64, + title: String, + #[serde(default)] + body: Option, + state: String, + html_url: String, + updated_at: String, + #[serde(default)] + comments: u64, + #[serde(default)] + labels: Vec, + #[serde(default)] + user: Option, + /// Present iff this "issue" is actually a pull request. + #[serde(default)] + pull_request: Option, +} + +#[derive(Debug, Deserialize)] +struct RawLabel { + name: String, +} + +#[derive(Debug, Deserialize)] +struct RawUser { + login: String, +} + +#[derive(Debug, Deserialize)] +struct RawComment { + id: u64, + body: String, + #[serde(default)] + user: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + fn issue(number: u64, title: &str, body: &str, state: &str) -> RawIssue { + RawIssue { + number, + title: title.to_string(), + body: Some(body.to_string()), + state: state.to_string(), + html_url: format!("https://github.com/o/r/issues/{number}"), + updated_at: "2026-06-19T00:00:00Z".to_string(), + comments: 0, + labels: vec![RawLabel { + name: "bug".to_string(), + }], + user: Some(RawUser { + login: "octocat".to_string(), + }), + pull_request: None, + } + } + + fn connector() -> GithubConnector { + GithubConnector::new(GithubConfig::new("o", "r")).unwrap() + } + + #[test] + fn normalize_builds_keyed_envelope_with_citation() { + let c = connector(); + let rec = c.normalize(&issue(57, "Connectors", "Add Redmine", "open"), &[]); + let env = &rec.envelope; + assert!(env.has_key()); + assert_eq!(env.source_system.as_deref(), Some("github")); + assert_eq!(env.source_id.as_deref(), Some("57")); + assert_eq!( + env.source_url.as_deref(), + Some("https://github.com/o/r/issues/57") + ); + assert_eq!(env.source_project.as_deref(), Some("o/r")); + assert!(rec.content.contains("Connectors")); + assert!(rec.tags.contains(&"state:open".to_string())); + assert!(rec.tags.contains(&"label:bug".to_string())); + } + + #[test] + fn hash_stable_across_label_order_and_changes_on_edit() { + let c = connector(); + let mut a = issue(1, "T", "body", "open"); + a.labels = vec![ + RawLabel { name: "b".into() }, + RawLabel { name: "a".into() }, + ]; + let mut b = issue(1, "T", "body", "open"); + b.labels = vec![ + RawLabel { name: "a".into() }, + RawLabel { name: "b".into() }, + ]; + let ha = c.normalize(&a, &[]).envelope.content_hash; + let hb = c.normalize(&b, &[]).envelope.content_hash; + assert_eq!(ha, hb, "label order must not change the hash"); + + // Editing the body must change the hash. + let edited = c.normalize(&issue(1, "T", "EDITED", "open"), &[]).envelope.content_hash; + assert_ne!(ha, edited); + + // Closing the issue changes state → changes the hash (not a no-op). + let closed = c.normalize(&issue(1, "T", "body", "closed"), &[]).envelope.content_hash; + assert_ne!(ha, closed); + } + + #[test] + fn comments_fold_in_id_order_and_affect_hash() { + let c = connector(); + let comments = vec![ + RawComment { + id: 2, + body: "second".into(), + user: Some(RawUser { login: "x".into() }), + }, + RawComment { + id: 1, + body: "first".into(), + user: Some(RawUser { login: "y".into() }), + }, + ]; + let rec = c.normalize(&issue(1, "T", "body", "open"), &comments); + // Folded in id order regardless of input order. + let first_pos = rec.content.find("first").unwrap(); + let second_pos = rec.content.find("second").unwrap(); + assert!(first_pos < second_pos, "comments must fold in id order"); + + let no_comments = c.normalize(&issue(1, "T", "body", "open"), &[]).envelope.content_hash; + assert_ne!( + rec.envelope.content_hash, no_comments, + "comments must contribute to the hash" + ); + } + + #[test] + fn rejects_empty_owner_repo() { + assert!(GithubConnector::new(GithubConfig::new("", "r")).is_err()); + assert!(GithubConnector::new(GithubConfig::new("o", "")).is_err()); + } + + #[test] + fn since_uses_z_form_not_plus_offset() { + // Regression: to_rfc3339() emits `+00:00`; the `+` decodes to a space + // server-side and corrupts the cursor. We must emit the `Z` form. + let ts = DateTime::parse_from_rfc3339("2026-06-19T00:00:00Z") + .unwrap() + .with_timezone(&Utc); + let z = ts.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + assert_eq!(z, "2026-06-19T00:00:00Z"); + assert!(!z.contains('+'), "since must not contain a reserved '+'"); + } + + #[test] + fn next_link_host_pin_drops_cross_host_url() { + // The host-pin parsing logic (used to prevent token exfiltration via a + // malicious Link header) must reject a different host. + let same = reqwest::Url::parse("https://api.github.com/x?page=2").unwrap(); + let other = reqwest::Url::parse("https://evil.example/x?page=2").unwrap(); + assert_eq!(same.host_str(), Some("api.github.com")); + assert_ne!(other.host_str(), Some("api.github.com")); + } +} diff --git a/crates/vestige-core/src/connectors/mod.rs b/crates/vestige-core/src/connectors/mod.rs new file mode 100644 index 0000000..033e065 --- /dev/null +++ b/crates/vestige-core/src/connectors/mod.rs @@ -0,0 +1,372 @@ +//! External-source connectors (#57). +//! +//! A connector turns records in a long-lived external system (a ticket tracker, +//! an issue board, a support queue) into source-aware Vestige memories, so an +//! investigative agent can search and reason over years of history **offline**, +//! **semantically**, and **cited back to the canonical record** — something no +//! live ticket-system MCP proxy can do. +//! +//! ## Layering +//! +//! - The [`Connector`] contract, [`NormalizedRecord`] shape, and the stable +//! [`content_hash`] are pure (no network) and always compiled, so the sync +//! semantics are unit-testable without hitting an API. +//! - Network-backed reference connectors (e.g. [`github`]) live behind the +//! `connectors` cargo feature so the default local-first build links no HTTP +//! client. +//! +//! ## Sync contract (the part that makes re-running safe) +//! +//! Every connector produces [`NormalizedRecord`]s. Each carries a +//! [`SourceEnvelope`](crate::memory::SourceEnvelope) whose +//! `(source_system, source_id)` is the idempotency key and whose `content_hash` +//! is the change detector. The driver routes each record through +//! [`upsert_by_source`](crate::storage::SqliteMemoryStore::upsert_by_source): +//! +//! - unseen record → insert +//! - changed `content_hash` → update in place (+ re-embed) +//! - same `content_hash` → no-op (only liveness advances) +//! +//! Because neither GitHub nor Redmine expose a deletion feed, deletions are +//! handled out-of-band by a periodic reconcile pass +//! ([`reconcile_source_tombstones`](crate::storage::SqliteMemoryStore::reconcile_source_tombstones)). + +use chrono::{DateTime, Utc}; + +use crate::memory::{IngestInput, SourceEnvelope}; +use crate::storage::ConnectorCursor; + +#[cfg(feature = "connectors")] +pub mod github; + +/// A single external record, already normalized into the fields Vestige needs. +/// +/// The connector is responsible for flattening a possibly-rich source record +/// (an issue plus its comments / journals / status changes) into a single +/// retrievable `content` blob plus the structured envelope. Keeping one memory +/// per logical record (rather than per comment) keeps retrieval coherent and +/// the idempotency key simple. +#[derive(Debug, Clone)] +pub struct NormalizedRecord { + /// Human-readable content to embed and search over. + pub content: String, + /// Tags for categorization (e.g. `["github", "issue", "state:open"]`). + pub tags: Vec, + /// The provenance envelope. `source_system`, `source_id`, and `content_hash` + /// MUST be set for idempotent upsert. + pub envelope: SourceEnvelope, +} + +impl NormalizedRecord { + /// Convert into an [`IngestInput`] ready for `upsert_by_source`. + pub fn into_ingest_input(self) -> IngestInput { + IngestInput { + content: self.content, + node_type: "event".to_string(), + source: self.envelope.source_url.clone(), + tags: self.tags, + source_envelope: Some(self.envelope), + ..Default::default() + } + } +} + +/// One page of records plus the cursor needed to fetch the next page. +#[derive(Debug, Clone, Default)] +pub struct FetchPage { + pub records: Vec, + /// Opaque token to resume after this page, or `None` when exhausted. + pub next_cursor: Option, +} + +/// Errors a connector can surface. +#[derive(Debug, thiserror::Error)] +pub enum ConnectorError { + #[error("configuration error: {0}")] + Config(String), + #[error("transport error: {0}")] + Transport(String), + #[error("rate limited; retry after {0:?}")] + RateLimited(Option), + #[error("source error ({status}): {message}")] + Source { status: u16, message: String }, +} + +pub type ConnectorResult = Result; + +/// The contract every external-source connector implements. +/// +/// Intentionally minimal: fetch a window of records updated since a cursor, +/// page through them, and (separately) enumerate currently-live ids for the +/// deletion-reconcile pass. The driver owns persistence, embedding, and cursor +/// checkpointing — a connector is just a typed, incremental reader. +#[allow(async_fn_in_trait)] +pub trait Connector { + /// Stable system identifier written into every envelope (`github`, …). + fn source_system(&self) -> &str; + + /// The scope this connector instance is bound to (`owner/repo`, project id). + fn scope(&self) -> &str; + + /// Fetch one page of records whose source-updated time is `>= since` + /// (inclusive on purpose — see the overlap note below), resuming from + /// `cursor` when provided. Records should be returned in ascending + /// update-time order so a mid-run interruption resumes safely. + /// + /// Callers pass `since = checkpoint − overlap` (a few minutes) so a record + /// written with a slightly-behind upstream clock, or one sharing the exact + /// boundary second, is never skipped. The `content_hash` short-circuit in + /// `upsert_by_source` makes the resulting re-scan free. + async fn fetch_updated( + &self, + since: Option>, + cursor: Option, + ) -> ConnectorResult; + + /// Enumerate the ids currently visible upstream for this scope, for the + /// deletion-reconcile pass. Cheap (ids only). `None` means the connector + /// cannot enumerate, so the driver must skip reconciliation rather than + /// tombstone everything. + async fn list_live_ids(&self) -> ConnectorResult>> { + Ok(None) + } +} + +/// Recommended overlap subtracted from the saved cursor before the next fetch, +/// to absorb clock skew and same-second boundary updates (the `>=` window). +pub const CURSOR_OVERLAP_SECS: i64 = 120; + +/// Summary of one sync run, returned to the caller / surfaced by the MCP tool. +#[derive(Debug, Clone, Default, serde::Serialize)] +pub struct SyncReport { + pub source_system: String, + pub scope: String, + pub created: usize, + pub updated: usize, + pub unchanged: usize, + pub tombstoned: usize, + /// New high-water mark persisted as the cursor for the next run. + pub new_cursor: Option>, + /// Whether a deletion-reconcile pass ran this time. + pub reconciled: bool, + /// Non-fatal warnings (e.g. a page that failed and was skipped). + pub warnings: Vec, +} + +/// Drive a full incremental sync of one connector into the store (#57). +/// +/// This is the orchestration the MCP `source_sync` tool calls. It: +/// 1. loads the saved checkpoint and starts from `cursor − overlap` (the `>=` +/// window that prevents missing same-second / clock-skewed updates); +/// 2. pages the connector forward in update order, routing each record through +/// [`upsert_by_source`](crate::storage::SqliteMemoryStore::upsert_by_source) +/// (insert / update-in-place / no-op by content hash); +/// 3. advances the cursor to the max `source_updated_at` actually observed, +/// persisting it only after the run so a crash re-scans rather than skips; +/// 4. optionally reconciles deletions when `reconcile` is set and the connector +/// can enumerate live ids. +/// +/// `max_pages` bounds a single run (so a first sync of a 15-year tracker can be +/// resumed across calls rather than blocking on one enormous fetch). +pub async fn run_sync( + store: &crate::storage::SqliteMemoryStore, + connector: &C, + reconcile: bool, + max_pages: usize, +) -> ConnectorResult { + use crate::storage::SourceUpsertOutcome; + + let source_system = connector.source_system().to_string(); + let scope = connector.scope().to_string(); + + let mut report = SyncReport { + source_system: source_system.clone(), + scope: scope.clone(), + ..Default::default() + }; + + // 1. Load checkpoint, apply the overlap window. + let checkpoint = store + .get_connector_cursor(&source_system, &scope) + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + let since = checkpoint + .cursor_updated_at + .map(|c| c - chrono::Duration::seconds(CURSOR_OVERLAP_SECS)); + + // 2. Page forward, upserting each record. + let mut cursor: Option = None; + let mut max_seen = checkpoint.cursor_updated_at; + // Oldest source_updated_at among records that FAILED to upsert this run. We + // must not advance the persisted cursor past this, or the failed record — + // fetched in ascending update order — would fall outside the next run's + // `since` window and never be retried (a silent permanent gap). + let mut oldest_failure: Option> = None; + // Count of genuinely new records (Created). Unchanged re-scans of the + // overlap window must not inflate the running total. + let mut created_this_run = 0i64; + + for _ in 0..max_pages.max(1) { + let page = connector.fetch_updated(since, cursor.clone()).await?; + for record in page.records { + let observed = record.envelope.source_updated_at; + match store.upsert_by_source(record.into_ingest_input()) { + Ok(res) => { + match res.outcome { + SourceUpsertOutcome::Created => { + report.created += 1; + created_this_run += 1; + } + SourceUpsertOutcome::Updated => report.updated += 1, + SourceUpsertOutcome::Unchanged => report.unchanged += 1, + } + if let Some(ts) = observed + && max_seen.map(|m| ts > m).unwrap_or(true) + { + max_seen = Some(ts); + } + } + Err(e) => { + report.warnings.push(format!("upsert failed: {e}")); + if let Some(ts) = observed + && oldest_failure.map(|f| ts < f).unwrap_or(true) + { + oldest_failure = Some(ts); + } + } + } + } + match page.next_cursor { + Some(next) => cursor = Some(next), + None => break, + } + } + + // Clamp the cursor so we never advance past a record that failed this run. + // Subtract one second so the next run's inclusive `since` re-includes it. + if let Some(failed_at) = oldest_failure { + let clamp_to = failed_at - chrono::Duration::seconds(1); + max_seen = Some(match max_seen { + Some(m) if m < clamp_to => m, + _ => clamp_to, + }); + } + + // 3. Optional deletion reconciliation. + let mut reconciled = false; + if reconcile { + match connector.list_live_ids().await { + Ok(Some(live_ids)) => { + match store.reconcile_source_tombstones(&source_system, &scope, &live_ids) { + Ok(r) => { + report.tombstoned = r.tombstoned.len(); + reconciled = true; + } + Err(e) => report.warnings.push(format!("reconcile failed: {e}")), + } + } + Ok(None) => report + .warnings + .push("connector cannot enumerate live ids; skipped reconcile".to_string()), + Err(e) => report.warnings.push(format!("list_live_ids failed: {e}")), + } + } + report.reconciled = reconciled; + report.new_cursor = max_seen; + + // 4. Persist the checkpoint (only after the run). + let now = Utc::now(); + let new_checkpoint = ConnectorCursor { + source_system: source_system.clone(), + scope: scope.clone(), + cursor_updated_at: max_seen, + last_synced_at: Some(now), + last_full_reconcile_at: if reconciled { + Some(now) + } else { + checkpoint.last_full_reconcile_at + }, + // Accumulate only NEW records, so re-scanning the overlap window (which + // reports Unchanged) does not inflate the running total. + records_seen: checkpoint.records_seen + created_this_run, + }; + store + .save_connector_cursor(&new_checkpoint) + .map_err(|e| ConnectorError::Transport(e.to_string()))?; + + Ok(report) +} + +/// Compute a stable content hash over the record's meaning. +/// +/// Stability requirements (so re-syncing an unchanged record is a true no-op): +/// - **key order independent** — callers pass `(field, value)` pairs which we +/// sort before hashing, so map/field ordering never changes the digest; +/// - **volatile fields excluded** — the caller must omit the cursor timestamp, +/// view/comment counts, and ephemeral permission flags (hash the meaning, +/// not the metadata); +/// - **collision-resistant** — BLAKE3 (already a Vestige dependency). +/// +/// Comment/journal arrays should be flattened into the pairs in a stable order +/// (sorted by their own id) by the caller before hashing. +pub fn content_hash(fields: &[(&str, &str)]) -> String { + let mut pairs: Vec<(&str, &str)> = fields.to_vec(); + pairs.sort_by(|a, b| a.0.cmp(b.0).then(a.1.cmp(b.1))); + + let mut hasher = blake3::Hasher::new(); + for (k, v) in pairs { + // Length-prefix each field so ("ab","c") can't collide with ("a","bc"). + hasher.update(&(k.len() as u64).to_le_bytes()); + hasher.update(k.as_bytes()); + hasher.update(&(v.len() as u64).to_le_bytes()); + hasher.update(v.as_bytes()); + } + hasher.finalize().to_hex().to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn content_hash_is_order_independent() { + let a = content_hash(&[("title", "Crash"), ("body", "stacktrace"), ("state", "open")]); + let b = content_hash(&[("state", "open"), ("title", "Crash"), ("body", "stacktrace")]); + assert_eq!(a, b, "reordering fields must not change the hash"); + } + + #[test] + fn content_hash_changes_with_content() { + let a = content_hash(&[("body", "v1")]); + let b = content_hash(&[("body", "v2")]); + assert_ne!(a, b, "different content must hash differently"); + } + + #[test] + fn content_hash_no_boundary_collision() { + // ("ab","c") vs ("a","bc") must differ thanks to length prefixing. + let a = content_hash(&[("ab", "c")]); + let b = content_hash(&[("a", "bc")]); + assert_ne!(a, b); + } + + #[test] + fn normalized_record_carries_envelope_into_input() { + let rec = NormalizedRecord { + content: "issue body".to_string(), + tags: vec!["github".to_string()], + envelope: SourceEnvelope { + source_system: Some("github".to_string()), + source_id: Some("42".to_string()), + source_url: Some("https://example/42".to_string()), + content_hash: Some("h".to_string()), + ..Default::default() + }, + }; + let input = rec.into_ingest_input(); + assert_eq!(input.content, "issue body"); + assert_eq!(input.source.as_deref(), Some("https://example/42")); + let env = input.source_envelope.unwrap(); + assert!(env.has_key()); + assert_eq!(env.source_id.as_deref(), Some("42")); + } +} diff --git a/crates/vestige-core/src/consolidation/phases.rs b/crates/vestige-core/src/consolidation/phases.rs index abf6019..7f74489 100644 --- a/crates/vestige-core/src/consolidation/phases.rs +++ b/crates/vestige-core/src/consolidation/phases.rs @@ -889,6 +889,7 @@ mod tests { embedding_model: None, suppression_count: 0, suppressed_at: None, + source_envelope: None, } } diff --git a/crates/vestige-core/src/lib.rs b/crates/vestige-core/src/lib.rs index 15dbdbf..e832828 100644 --- a/crates/vestige-core/src/lib.rs +++ b/crates/vestige-core/src/lib.rs @@ -82,6 +82,7 @@ /// Optional `vestige.toml` configuration (Phase 2: Configurable Output). pub mod config; +pub mod connectors; pub mod consolidation; pub mod embedder; pub mod fsrs; @@ -134,6 +135,7 @@ pub use memory::{ SearchMode, SearchResult, SimilarityResult, + SourceEnvelope, TableIntrospection, TemporalRange, }; @@ -166,6 +168,7 @@ pub use storage::{ CompositionNeighborRecord, CompositionOutcomeRecord, ConnectionRecord, + ConnectorCursor, ConsolidationHistoryRecord, Domain, DreamHistoryRecord, @@ -184,10 +187,13 @@ pub use storage::{ PortableArchive, PortableImportMode, PortableImportReport, + ReconcileReport, Result, SchedulingState, SearchQuery, SmartIngestResult, + SourceUpsertOutcome, + SourceUpsertResult, SqliteMemoryStore, StateTransitionRecord, Storage, diff --git a/crates/vestige-core/src/memory/mod.rs b/crates/vestige-core/src/memory/mod.rs index 8cd618e..af9daeb 100644 --- a/crates/vestige-core/src/memory/mod.rs +++ b/crates/vestige-core/src/memory/mod.rs @@ -10,7 +10,7 @@ mod node; mod strength; mod temporal; -pub use node::{IngestInput, KnowledgeNode, NodeType, RecallInput, SearchMode}; +pub use node::{IngestInput, KnowledgeNode, NodeType, RecallInput, SearchMode, SourceEnvelope}; pub use strength::{DualStrength, StrengthDecay}; pub use temporal::{TemporalRange, TemporalValidity}; diff --git a/crates/vestige-core/src/memory/node.rs b/crates/vestige-core/src/memory/node.rs index 9785387..c400afc 100644 --- a/crates/vestige-core/src/memory/node.rs +++ b/crates/vestige-core/src/memory/node.rs @@ -79,6 +79,91 @@ impl std::fmt::Display for NodeType { } } +// ============================================================================ +// SOURCE ENVELOPE (#57) +// ============================================================================ + +/// Structured provenance for a memory that mirrors a record in an external +/// system of record (a Redmine issue, a GitHub Issue, a Jira ticket, a support +/// thread, …). +/// +/// The product boundary (#57): the external system stays canonical. Vestige +/// **indexes, connects, retrieves, and cites back**; it does not replace the +/// ticket tracker. This envelope carries exactly the fields a connector needs +/// to do that without leaking stale data: +/// +/// - `(source_system, source_id)` is the **idempotency key**. Re-running a sync +/// upserts the same logical record instead of duplicating it. +/// - `content_hash` is the **change detector**. If a re-fetched record hashes +/// to the stored value, the upsert is a no-op (only `synced_at` advances), +/// so an incremental re-scan never churns the index or the embedding model. +/// - `source_url` is the **citation**. Search results link back to the +/// canonical record so the agent can follow it for authoritative detail. +/// - `source_updated_at` is the **cursor field** the connector checkpoints on. +/// +/// Every field is optional at the type level so partial connectors and manual +/// imports can populate only what they have, but a real connector should always +/// set `source_system`, `source_id`, and `content_hash`. +#[non_exhaustive] +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SourceEnvelope { + /// External system this record came from, e.g. `redmine`, `github`, `jira`. + /// Namespaces `source_id` so two systems can share numeric ids. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_system: Option, + /// Stable native id in the source system (Redmine issue id, GitHub issue + /// number/node id, …). Combined with `source_system` it is the upsert key. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_id: Option, + /// Canonical URL back to the record so retrieval can cite the source. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_url: Option, + /// When the source record was last updated upstream (the connector cursor + /// field — Redmine `updated_on`, GitHub `updated_at`). RFC 3339. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_updated_at: Option>, + /// Stable hash of the normalized record content. Idempotency / change key. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub content_hash: Option, + /// When the connector last observed this record live. Drives tombstone + /// reconciliation (a record not seen in a full reconcile pass is gone). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub synced_at: Option>, + /// Project / repo / space the record belongs to (Redmine project, GitHub + /// `owner/repo`). Used for scoped sync and search filters. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_project: Option, + /// Record type within the source (`issue`, `comment`, `journal`, …). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_type: Option, + /// Author / reporter of the record in the source system. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_author: Option, +} + +impl SourceEnvelope { + /// True once the two fields a connector needs for idempotent upsert are + /// present. Manual imports that only set `source_url` are not "keyed". + pub fn has_key(&self) -> bool { + self.source_system.is_some() && self.source_id.is_some() + } + + /// True if every field is unset — used to collapse an all-`None` envelope + /// back to `None` on the node so legacy rows stay clean. + pub fn is_empty(&self) -> bool { + self.source_system.is_none() + && self.source_id.is_none() + && self.source_url.is_none() + && self.source_updated_at.is_none() + && self.content_hash.is_none() + && self.synced_at.is_none() + && self.source_project.is_none() + && self.source_type.is_none() + && self.source_author.is_none() + } +} + // ============================================================================ // KNOWLEDGE NODE // ============================================================================ @@ -188,6 +273,15 @@ pub struct KnowledgeNode { /// Timestamp of the most recent suppression (for 24h labile window). #[serde(skip_serializing_if = "Option::is_none")] pub suppressed_at: Option>, + + // ========== Source Envelope (#57, external-source connectors) ========== + /// Structured provenance for memories ingested from an external system + /// (Redmine, GitHub Issues, Jira, …). `None` for memories created directly + /// by an agent or the user — the legacy free-form `source` string above + /// remains the human-readable label; this envelope is the machine-readable, + /// idempotency- and citation-bearing record. See [`SourceEnvelope`]. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_envelope: Option, } impl Default for KnowledgeNode { @@ -224,6 +318,7 @@ impl Default for KnowledgeNode { embedding_model: None, suppression_count: 0, suppressed_at: None, + source_envelope: None, } } } @@ -291,6 +386,11 @@ pub struct IngestInput { /// When this knowledge stops being valid #[serde(skip_serializing_if = "Option::is_none")] pub valid_until: Option>, + /// Structured provenance for connector-ingested records (#57). When set + /// with a `(source_system, source_id)` key, callers should route through + /// `upsert_by_source` for idempotent sync rather than plain `ingest`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source_envelope: Option, } impl Default for IngestInput { @@ -304,6 +404,7 @@ impl Default for IngestInput { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, } } } diff --git a/crates/vestige-core/src/storage/migrations.rs b/crates/vestige-core/src/storage/migrations.rs index c0c60d2..58e5202 100644 --- a/crates/vestige-core/src/storage/migrations.rs +++ b/crates/vestige-core/src/storage/migrations.rs @@ -84,6 +84,11 @@ pub const MIGRATIONS: &[Migration] = &[ description: "ADR 0001 Phase 1: embedding_model registry, domains/domain_scores columns, domains table", up: MIGRATION_V16_UP, }, + Migration { + version: 17, + description: "#57 Source envelope: provenance columns + connector cursor checkpoints for idempotent external-source sync", + up: MIGRATION_V17_UP, + }, ]; /// A database migration @@ -957,6 +962,73 @@ pub const MIGRATION_V16_ALTER_COLUMNS: &[&str] = &[ "ALTER TABLE knowledge_nodes ADD COLUMN domain_scores TEXT NOT NULL DEFAULT '{}'", ]; +/// V17: #57 Source envelope — structured provenance for connector-ingested +/// records, plus a per-connector cursor checkpoint table. +/// +/// The provenance columns live directly on `knowledge_nodes` (rather than a +/// side table) so search can filter and cite them with no join. They are all +/// nullable and default-NULL, so every existing memory is untouched and the +/// migration is purely additive — legacy rows simply have no envelope. +/// +/// The `(source_system, source_id)` pair is the idempotency key for +/// `upsert_by_source`; the unique index enforces one memory per external +/// record. `content_hash` is the change detector. `connector_cursors` holds the +/// incremental-sync high-water mark and last full-reconcile time per +/// (source_system, scope). +/// +/// The `ALTER TABLE ... ADD COLUMN` statements are split into +/// `MIGRATION_V17_ALTER_COLUMNS` and run individually by the migration runner, +/// because SQLite has no `ADD COLUMN IF NOT EXISTS`; duplicate-column errors are +/// swallowed so replay stays idempotent. +const MIGRATION_V17_UP: &str = r#" +-- Idempotency key: at most one memory per (source_system, source_id). +-- Partial unique index so the millions of envelope-less legacy rows (all NULL) +-- don't collide and don't pay index cost. +CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_source_key + ON knowledge_nodes(source_system, source_id) + WHERE source_system IS NOT NULL AND source_id IS NOT NULL; + +-- Filter/scan support for source-aware search + reconciliation passes. +CREATE INDEX IF NOT EXISTS idx_nodes_source_system + ON knowledge_nodes(source_system) + WHERE source_system IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_nodes_source_project + ON knowledge_nodes(source_project) + WHERE source_project IS NOT NULL; + +-- Per-connector incremental-sync checkpoint. One row per (source_system, scope) +-- e.g. ('github', 'samvallad33/vestige'). `cursor_updated_at` is the +-- high-water mark on the source's update timestamp; `last_full_reconcile_at` +-- gates the (expensive) deletion-reconcile pass. +CREATE TABLE IF NOT EXISTS connector_cursors ( + source_system TEXT NOT NULL, + scope TEXT NOT NULL, + cursor_updated_at TEXT, + last_synced_at TEXT, + last_full_reconcile_at TEXT, + records_seen INTEGER NOT NULL DEFAULT 0, + config TEXT NOT NULL DEFAULT '{}', + PRIMARY KEY (source_system, scope) +); + +UPDATE schema_version SET version = 17, applied_at = datetime('now'); +"#; + +/// The `ALTER TABLE` statements for V17. Run individually + idempotently by the +/// migration runner (SQLite has no `ADD COLUMN IF NOT EXISTS`). +pub const MIGRATION_V17_ALTER_COLUMNS: &[&str] = &[ + "ALTER TABLE knowledge_nodes ADD COLUMN source_system TEXT", + "ALTER TABLE knowledge_nodes ADD COLUMN source_id TEXT", + "ALTER TABLE knowledge_nodes ADD COLUMN source_url TEXT", + "ALTER TABLE knowledge_nodes ADD COLUMN source_updated_at TEXT", + "ALTER TABLE knowledge_nodes ADD COLUMN content_hash TEXT", + "ALTER TABLE knowledge_nodes ADD COLUMN synced_at TEXT", + "ALTER TABLE knowledge_nodes ADD COLUMN source_project TEXT", + "ALTER TABLE knowledge_nodes ADD COLUMN source_type TEXT", + "ALTER TABLE knowledge_nodes ADD COLUMN source_author TEXT", +]; + /// Apply pending migrations pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result { let current_version = get_current_version(conn)?; @@ -994,6 +1066,15 @@ pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result { } } + // V17 (#57) adds the source-envelope columns. Same idempotent + // ALTER handling as V16 — the unique index in the V17 batch + // references these columns, so they must exist before the batch. + if migration.version == 17 { + for stmt in MIGRATION_V17_ALTER_COLUMNS { + add_column_if_missing(conn, stmt)?; + } + } + // Use execute_batch to handle multi-statement SQL including triggers conn.execute_batch(migration.up)?; @@ -1026,11 +1107,11 @@ mod tests { // Pre-requisite: schema_version must be bootstrapped by V1. apply_migrations(&conn).expect("apply_migrations succeeds"); - // 1. schema_version advanced to V16 + // 1. schema_version advanced to the latest migration let version = get_current_version(&conn).expect("read schema_version"); assert_eq!( - version, 16, - "schema_version must be 16 after all migrations" + version, 17, + "schema_version must be 17 after all migrations" ); // 2. knowledge_edges is gone (V11 drops it) @@ -1151,11 +1232,11 @@ mod tests { // Replay V11 onward. V11 uses DROP TABLE IF EXISTS so it is idempotent. // V12/V13 tombstone tables use CREATE TABLE IF NOT EXISTS. V14/V16 ALTER // TABLE idempotency is handled by the migration runner. - apply_migrations(&conn).expect("V11..V16 replay must be idempotent"); + apply_migrations(&conn).expect("V11..V17 replay must be idempotent"); // After replaying from V10, the schema advances to the latest version. let version = get_current_version(&conn).expect("read schema_version"); - assert_eq!(version, 16, "schema_version back at 16 after replay"); + assert_eq!(version, 17, "schema_version back at latest after replay"); } #[test] @@ -1229,7 +1310,97 @@ mod tests { // V16 uses CREATE TABLE IF NOT EXISTS and idempotent ALTER handling. apply_migrations(&conn).expect("V16 replay must be idempotent"); let version = get_current_version(&conn).expect("read version"); - assert_eq!(version, 16, "schema_version must be 16 after replay"); + assert_eq!(version, 17, "schema_version must be latest after replay"); + } + + #[test] + fn v17_adds_source_envelope_columns_and_cursor_table() { + let conn = rusqlite::Connection::open_in_memory().expect("open in-memory"); + apply_migrations(&conn).expect("apply_migrations"); + + // All nine envelope columns must exist on knowledge_nodes. + let cols: Vec = { + let mut stmt = conn + .prepare("PRAGMA table_info(knowledge_nodes)") + .expect("prepare"); + stmt.query_map([], |row| row.get::<_, String>(1)) + .expect("query_map") + .filter_map(|r| r.ok()) + .collect() + }; + for c in [ + "source_system", + "source_id", + "source_url", + "source_updated_at", + "content_hash", + "synced_at", + "source_project", + "source_type", + "source_author", + ] { + assert!( + cols.iter().any(|x| x == c), + "knowledge_nodes must have `{c}` column after V17" + ); + } + + // connector_cursors table must exist. + let cursor_rows: i64 = conn + .query_row( + "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='connector_cursors'", + [], + |row| row.get(0), + ) + .expect("query sqlite_master"); + assert_eq!(cursor_rows, 1, "connector_cursors must be created by V17"); + } + + #[test] + fn v17_unique_source_key_index_allows_many_null_legacy_rows() { + let conn = rusqlite::Connection::open_in_memory().expect("open in-memory"); + apply_migrations(&conn).expect("apply_migrations"); + + // Two legacy rows with NULL source key must NOT collide on the partial + // unique index (the index only covers non-NULL keys). + for id in ["a", "b"] { + conn.execute( + "INSERT INTO knowledge_nodes (id, content, node_type, created_at, updated_at, last_accessed, \ + stability, difficulty, reps, lapses, learning_state, storage_strength, retrieval_strength, \ + retention_strength, next_review, scheduled_days, has_embedding) \ + VALUES (?1,'c','fact',datetime('now'),datetime('now'),datetime('now'),\ + 1.0,0.3,0,0,'new',1.0,1.0,1.0,datetime('now'),1,0)", + [id], + ) + .expect("insert legacy row"); + } + + // Two real connector rows that share (source_system, source_id) MUST + // collide — the unique index is the idempotency guarantee. + conn.execute( + "UPDATE knowledge_nodes SET source_system='github', source_id='1' WHERE id='a'", + [], + ) + .expect("set source key on a"); + let dup = conn.execute( + "UPDATE knowledge_nodes SET source_system='github', source_id='1' WHERE id='b'", + [], + ); + assert!( + dup.is_err(), + "duplicate (source_system, source_id) must violate the unique index" + ); + } + + #[test] + fn v17_is_replayable() { + let conn = rusqlite::Connection::open_in_memory().expect("open in-memory"); + apply_migrations(&conn).expect("first apply"); + conn.execute("UPDATE schema_version SET version = 16", []) + .expect("rewind to 16"); + apply_migrations(&conn).expect("V17 replay must be idempotent"); + let version = get_current_version(&conn).expect("read version"); + assert_eq!(version, 17, "schema_version must be 17 after replay"); } #[test] diff --git a/crates/vestige-core/src/storage/mod.rs b/crates/vestige-core/src/storage/mod.rs index 5f0a54c..c5db4e2 100644 --- a/crates/vestige-core/src/storage/mod.rs +++ b/crates/vestige-core/src/storage/mod.rs @@ -19,9 +19,10 @@ pub use portable::{ }; pub use sqlite::{ CompositionEventRecord, CompositionMemberRecord, CompositionNeighborRecord, - CompositionOutcomeRecord, ConnectionRecord, ConsolidationHistoryRecord, DreamHistoryRecord, - FilePortableSyncBackend, InsightRecord, IntentionRecord, NeverComposedCandidate, - PortableSyncBackend, PortableSyncReport, Result, SmartIngestResult, SqliteMemoryStore, + CompositionOutcomeRecord, ConnectionRecord, ConnectorCursor, ConsolidationHistoryRecord, + DreamHistoryRecord, FilePortableSyncBackend, InsightRecord, IntentionRecord, + NeverComposedCandidate, PortableSyncBackend, PortableSyncReport, ReconcileReport, Result, + SmartIngestResult, SourceUpsertOutcome, SourceUpsertResult, SqliteMemoryStore, StateTransitionRecord, StorageError, }; diff --git a/crates/vestige-core/src/storage/sqlite.rs b/crates/vestige-core/src/storage/sqlite.rs index 9685f61..399b2fb 100644 --- a/crates/vestige-core/src/storage/sqlite.rs +++ b/crates/vestige-core/src/storage/sqlite.rs @@ -682,6 +682,12 @@ impl SqliteMemoryStore { let valid_from_str = input.valid_from.map(|dt| dt.to_rfc3339()); let valid_until_str = input.valid_until.map(|dt| dt.to_rfc3339()); + // #57 Source envelope — flatten to nullable column values. A node with + // no external provenance leaves all nine columns NULL (legacy shape). + let env = input.source_envelope.clone().unwrap_or_default(); + let env_source_updated_at = env.source_updated_at.map(|dt| dt.to_rfc3339()); + let env_synced_at = env.synced_at.map(|dt| dt.to_rfc3339()); + { let writer = self .writer @@ -694,14 +700,18 @@ impl SqliteMemoryStore { storage_strength, retrieval_strength, retention_strength, sentiment_score, sentiment_magnitude, next_review, scheduled_days, source, tags, valid_from, valid_until, has_embedding, embedding_model, - domains, domain_scores + domains, domain_scores, + source_system, source_id, source_url, source_updated_at, + content_hash, synced_at, source_project, source_type, source_author ) VALUES ( ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, - '[]', '{}' + '[]', '{}', + ?25, ?26, ?27, ?28, + ?29, ?30, ?31, ?32, ?33 )", params![ id, @@ -728,6 +738,15 @@ impl SqliteMemoryStore { valid_until_str, 0, Option::::None, + env.source_system, + env.source_id, + env.source_url, + env_source_updated_at, + env.content_hash, + env_synced_at, + env.source_project, + env.source_type, + env.source_author, ], )?; } @@ -1257,6 +1276,33 @@ impl SqliteMemoryStore { .ok() }); + // #57 Source envelope columns (Migration V17). `.ok().flatten()` is + // tolerant of pre-V17 databases that lack these columns. Collapse an + // all-NULL envelope to `None` so legacy nodes serialize unchanged. + let parse_ts = |s: Option| -> Option> { + s.and_then(|s| { + DateTime::parse_from_rfc3339(&s) + .map(|dt| dt.with_timezone(&Utc)) + .ok() + }) + }; + let envelope = crate::memory::SourceEnvelope { + source_system: row.get("source_system").ok().flatten(), + source_id: row.get("source_id").ok().flatten(), + source_url: row.get("source_url").ok().flatten(), + source_updated_at: parse_ts(row.get("source_updated_at").ok().flatten()), + content_hash: row.get("content_hash").ok().flatten(), + synced_at: parse_ts(row.get("synced_at").ok().flatten()), + source_project: row.get("source_project").ok().flatten(), + source_type: row.get("source_type").ok().flatten(), + source_author: row.get("source_author").ok().flatten(), + }; + let source_envelope = if envelope.is_empty() { + None + } else { + Some(envelope) + }; + Ok(KnowledgeNode { id: row.get("id")?, content: row.get("content")?, @@ -1293,6 +1339,8 @@ impl SqliteMemoryStore { // v2.0.5 Active Forgetting suppression_count, suppressed_at, + // #57 Source envelope + source_envelope, }) } @@ -9415,6 +9463,336 @@ impl crate::storage::memory_store::MemoryStoreSend for SqliteMemoryStore { } } +// ============================================================================ +// CONNECTOR SYNC (#57) — idempotent external-source ingestion +// ============================================================================ + +/// What `upsert_by_source` did with one external record. Drives the +/// created/updated/unchanged/tombstoned counts a connector reports. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SourceUpsertOutcome { + /// No memory existed for this `(source_system, source_id)` — inserted. + Created, + /// A memory existed and the `content_hash` changed — body + envelope updated + /// and the embedding regenerated. + Updated, + /// A memory existed with the same `content_hash` — nothing rewritten except + /// `synced_at` (so an incremental re-scan is free). + Unchanged, +} + +/// Result of one `upsert_by_source` call. +#[derive(Debug, Clone)] +pub struct SourceUpsertResult { + pub outcome: SourceUpsertOutcome, + /// Memory id of the affected node (new or existing). + pub node_id: String, +} + +/// Incremental-sync checkpoint for one `(source_system, scope)`. +#[derive(Debug, Clone, Default)] +pub struct ConnectorCursor { + pub source_system: String, + pub scope: String, + /// High-water mark on the source's update timestamp. `None` on first sync. + pub cursor_updated_at: Option>, + pub last_synced_at: Option>, + pub last_full_reconcile_at: Option>, + pub records_seen: i64, +} + +/// Outcome of a tombstone reconciliation pass. +#[derive(Debug, Clone, Default)] +pub struct ReconcileReport { + /// Memory ids that were tombstoned (no longer visible upstream). + pub tombstoned: Vec, + /// Number of local records considered for this scope. + pub considered: usize, +} + +impl SqliteMemoryStore { + /// Idempotently upsert one external-source record, keyed on the envelope's + /// `(source_system, source_id)` (#57). + /// + /// This is the core primitive every connector calls per record. It makes + /// re-running a sync safe and cheap: + /// + /// - **No existing memory** for the key → insert (`Created`). + /// - **Existing memory, `content_hash` changed** → update content + envelope, + /// stamp `updated_at`, regenerate the embedding (`Updated`). + /// - **Existing memory, `content_hash` unchanged** → touch only `synced_at` + /// so the reconcile pass knows the record is still live (`Unchanged`). + /// + /// The caller MUST set `source_system`, `source_id`, and `content_hash` on + /// the input's `source_envelope`; otherwise this falls back to a plain + /// `ingest` (an un-keyed record can't be deduplicated). + pub fn upsert_by_source(&self, input: IngestInput) -> Result { + let env = match input.source_envelope.clone() { + Some(e) if e.has_key() => e, + // No idempotency key — behave like a normal create. + _ => { + let node = self.ingest(input)?; + return Ok(SourceUpsertResult { + outcome: SourceUpsertOutcome::Created, + node_id: node.id, + }); + } + }; + + let source_system = env.source_system.clone().unwrap_or_default(); + let source_id = env.source_id.clone().unwrap_or_default(); + let now = Utc::now(); + + // Look up the existing memory for this external record, if any. + let existing: Option<(String, Option)> = { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + reader + .query_row( + "SELECT id, content_hash FROM knowledge_nodes \ + WHERE source_system = ?1 AND source_id = ?2 LIMIT 1", + params![source_system, source_id], + |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option>(1)?)), + ) + .optional()? + }; + + let Some((node_id, stored_hash)) = existing else { + // First time we've seen this record — plain insert carries the + // envelope through the existing ingest path. + let node = self.ingest(input)?; + return Ok(SourceUpsertResult { + outcome: SourceUpsertOutcome::Created, + node_id: node.id, + }); + }; + + let new_hash = env.content_hash.clone(); + let unchanged = match (&stored_hash, &new_hash) { + // Both present and equal → genuinely unchanged. + (Some(a), Some(b)) => a == b, + // Either side missing a hash → be conservative and treat as changed + // so we never silently skip a real update. + _ => false, + }; + + let env_source_updated_at = env.source_updated_at.map(|dt| dt.to_rfc3339()); + let synced_at = now.to_rfc3339(); + + if unchanged { + // Cheapest path: only advance liveness + the source cursor field. + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + // Un-tombstone fully: a reappearing record clears BOTH bitemporal + // markers (valid_until AND superseded_by), otherwise it would be + // resurrected as currently-valid yet still flagged as superseded, + // which permanently excludes it from merge/consolidation. + "UPDATE knowledge_nodes \ + SET synced_at = ?1, source_updated_at = COALESCE(?2, source_updated_at), \ + source_url = COALESCE(?3, source_url), \ + valid_until = NULL, superseded_by = NULL \ + WHERE id = ?4", + params![synced_at, env_source_updated_at, env.source_url, node_id], + )?; + return Ok(SourceUpsertResult { + outcome: SourceUpsertOutcome::Unchanged, + node_id, + }); + } + + // Content changed upstream → update body + full envelope, clear any + // prior tombstone (`valid_until`), then regenerate the embedding. + { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + // Clear BOTH bitemporal markers on update (see Unchanged branch). + "UPDATE knowledge_nodes SET \ + content = ?1, updated_at = ?2, synced_at = ?3, \ + content_hash = ?4, source_url = ?5, source_updated_at = ?6, \ + source_project = ?7, source_type = ?8, source_author = ?9, \ + valid_until = NULL, superseded_by = NULL \ + WHERE id = ?10", + params![ + input.content, + now.to_rfc3339(), + synced_at, + env.content_hash, + env.source_url, + env_source_updated_at, + env.source_project, + env.source_type, + env.source_author, + node_id, + ], + )?; + } + + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + { + if let Some(index) = self.vector_index.as_ref() + && let Ok(mut index) = index.lock() + { + let _ = index.remove(&node_id); + } + if let Err(e) = self.generate_embedding_for_node(&node_id, &input.content) { + tracing::warn!("Failed to regenerate embedding for {}: {}", node_id, e); + } + } + + Ok(SourceUpsertResult { + outcome: SourceUpsertOutcome::Updated, + node_id, + }) + } + + /// Read the incremental-sync checkpoint for a `(source_system, scope)`. + /// Returns a zeroed cursor (no high-water mark) if none has been saved yet. + pub fn get_connector_cursor(&self, source_system: &str, scope: &str) -> Result { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let row = reader + .query_row( + "SELECT cursor_updated_at, last_synced_at, last_full_reconcile_at, records_seen \ + FROM connector_cursors WHERE source_system = ?1 AND scope = ?2", + params![source_system, scope], + |row| { + Ok(( + row.get::<_, Option>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, Option>(2)?, + row.get::<_, i64>(3)?, + )) + }, + ) + .optional()?; + + let parse = |s: Option| -> Option> { + s.and_then(|s| { + DateTime::parse_from_rfc3339(&s) + .map(|dt| dt.with_timezone(&Utc)) + .ok() + }) + }; + + Ok(match row { + Some((cur, last, recon, seen)) => ConnectorCursor { + source_system: source_system.to_string(), + scope: scope.to_string(), + cursor_updated_at: parse(cur), + last_synced_at: parse(last), + last_full_reconcile_at: parse(recon), + records_seen: seen, + }, + None => ConnectorCursor { + source_system: source_system.to_string(), + scope: scope.to_string(), + ..Default::default() + }, + }) + } + + /// Persist the incremental-sync checkpoint for a `(source_system, scope)`. + pub fn save_connector_cursor(&self, cursor: &ConnectorCursor) -> Result<()> { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "INSERT INTO connector_cursors \ + (source_system, scope, cursor_updated_at, last_synced_at, \ + last_full_reconcile_at, records_seen) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6) \ + ON CONFLICT(source_system, scope) DO UPDATE SET \ + cursor_updated_at = excluded.cursor_updated_at, \ + last_synced_at = excluded.last_synced_at, \ + last_full_reconcile_at = excluded.last_full_reconcile_at, \ + records_seen = excluded.records_seen", + params![ + cursor.source_system, + cursor.scope, + cursor.cursor_updated_at.map(|d| d.to_rfc3339()), + cursor.last_synced_at.map(|d| d.to_rfc3339()), + cursor.last_full_reconcile_at.map(|d| d.to_rfc3339()), + cursor.records_seen, + ], + )?; + Ok(()) + } + + /// Reconcile deletions for a scope: tombstone every local memory in + /// `(source_system, source_project = scope)` whose `source_id` is NOT in the + /// caller-supplied set of currently-live ids (#57). + /// + /// Neither Redmine nor GitHub exposes a deletion feed, so an incremental + /// `updated_at` sync can never see a delete. The connector therefore + /// periodically enumerates the full set of live ids and calls this. We + /// **invalidate, don't purge** (Graphiti-style): the memory keeps its + /// content for audit but gets `valid_until = now`, so it falls out of + /// "currently valid" retrieval without losing history. A record that + /// reappears upstream is un-tombstoned by the next `upsert_by_source` + /// (which clears `valid_until`). + pub fn reconcile_source_tombstones( + &self, + source_system: &str, + scope: &str, + live_ids: &[String], + ) -> Result { + let live: std::collections::HashSet<&str> = live_ids.iter().map(|s| s.as_str()).collect(); + + // All currently-valid local records for this scope. + let local: Vec<(String, String)> = { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare( + "SELECT id, source_id FROM knowledge_nodes \ + WHERE source_system = ?1 AND source_project = ?2 \ + AND source_id IS NOT NULL AND valid_until IS NULL", + )?; + let rows = stmt.query_map(params![source_system, scope], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)) + })?; + rows.filter_map(|r| r.ok()).collect() + }; + + let considered = local.len(); + let now = Utc::now().to_rfc3339(); + let mut tombstoned = Vec::new(); + + { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + for (node_id, source_id) in &local { + if !live.contains(source_id.as_str()) { + writer.execute( + "UPDATE knowledge_nodes SET valid_until = ?1 WHERE id = ?2", + params![now, node_id], + )?; + tombstoned.push(node_id.clone()); + } + } + } + + Ok(ReconcileReport { + tombstoned, + considered, + }) + } +} + // ============================================================================ // TESTS // ============================================================================ @@ -9443,6 +9821,234 @@ mod tests { Storage::new(Some(dir.path().join(name))).unwrap() } + // ===================== Connector sync (#57) ========================= + + /// Build an `IngestInput` carrying a source envelope for a GitHub-ish issue. + fn source_input(id: &str, content: &str, hash: &str) -> IngestInput { + IngestInput { + content: content.to_string(), + node_type: "fact".to_string(), + source_envelope: Some(crate::memory::SourceEnvelope { + source_system: Some("github".to_string()), + source_id: Some(id.to_string()), + source_url: Some(format!("https://github.com/o/r/issues/{id}")), + content_hash: Some(hash.to_string()), + source_project: Some("o/r".to_string()), + source_type: Some("issue".to_string()), + source_author: Some("octocat".to_string()), + ..Default::default() + }), + ..Default::default() + } + } + + fn node_count(store: &Storage) -> i64 { + // Count rows for our test source so embeddings/other tests don't bleed in. + let reader = store.reader.lock().unwrap(); + reader + .query_row( + "SELECT COUNT(*) FROM knowledge_nodes WHERE source_system = 'github'", + [], + |r| r.get(0), + ) + .unwrap() + } + + #[test] + fn upsert_by_source_is_idempotent_across_reruns() { + let store = create_test_storage(); + + // First sync: a brand-new record → Created. + let r1 = store + .upsert_by_source(source_input("1", "Bug: crash on startup", "hash-a")) + .unwrap(); + assert_eq!(r1.outcome, SourceUpsertOutcome::Created); + assert_eq!(node_count(&store), 1); + + // Re-sync the SAME record with the SAME hash twice → Unchanged, no dupes. + for _ in 0..2 { + let r = store + .upsert_by_source(source_input("1", "Bug: crash on startup", "hash-a")) + .unwrap(); + assert_eq!(r.outcome, SourceUpsertOutcome::Unchanged); + assert_eq!(r.node_id, r1.node_id, "must reuse the same memory id"); + } + assert_eq!(node_count(&store), 1, "idempotent: still exactly one memory"); + } + + #[test] + fn upsert_by_source_updates_in_place_when_hash_changes() { + let store = create_test_storage(); + let created = store + .upsert_by_source(source_input("7", "old body", "hash-old")) + .unwrap(); + + // Upstream edit: content + hash change → Updated, same id, new content. + let updated = store + .upsert_by_source(source_input("7", "new edited body", "hash-new")) + .unwrap(); + assert_eq!(updated.outcome, SourceUpsertOutcome::Updated); + assert_eq!(updated.node_id, created.node_id); + assert_eq!(node_count(&store), 1, "update must not duplicate"); + + let node = store.get_node(&created.node_id).unwrap().unwrap(); + assert_eq!(node.content, "new edited body"); + let env = node.source_envelope.expect("envelope persisted"); + assert_eq!(env.content_hash.as_deref(), Some("hash-new")); + assert_eq!(env.source_id.as_deref(), Some("7")); + } + + #[test] + fn upsert_by_source_without_key_falls_back_to_create() { + let store = create_test_storage(); + // Envelope present but missing source_id → not keyed → plain create. + let input = IngestInput { + content: "loose note".to_string(), + node_type: "fact".to_string(), + source_envelope: Some(crate::memory::SourceEnvelope { + source_url: Some("https://example.com/x".to_string()), + ..Default::default() + }), + ..Default::default() + }; + let r = store.upsert_by_source(input).unwrap(); + assert_eq!(r.outcome, SourceUpsertOutcome::Created); + } + + #[test] + fn connector_cursor_round_trips() { + let store = create_test_storage(); + // Unknown scope → zeroed cursor. + let empty = store.get_connector_cursor("github", "o/r").unwrap(); + assert!(empty.cursor_updated_at.is_none()); + assert_eq!(empty.records_seen, 0); + + let ts = Utc::now(); + let cursor = ConnectorCursor { + source_system: "github".to_string(), + scope: "o/r".to_string(), + cursor_updated_at: Some(ts), + last_synced_at: Some(ts), + last_full_reconcile_at: None, + records_seen: 42, + }; + store.save_connector_cursor(&cursor).unwrap(); + + let back = store.get_connector_cursor("github", "o/r").unwrap(); + assert_eq!(back.records_seen, 42); + assert_eq!( + back.cursor_updated_at.map(|d| d.to_rfc3339()), + Some(ts.to_rfc3339()) + ); + + // Upsert semantics: saving again replaces, never duplicates. + let mut c2 = cursor.clone(); + c2.records_seen = 99; + store.save_connector_cursor(&c2).unwrap(); + assert_eq!(store.get_connector_cursor("github", "o/r").unwrap().records_seen, 99); + } + + #[test] + fn reconcile_tombstones_records_absent_from_live_set() { + let store = create_test_storage(); + // Three synced issues in scope o/r. + for id in ["1", "2", "3"] { + store + .upsert_by_source(source_input(id, &format!("issue {id}"), &format!("h{id}"))) + .unwrap(); + } + + // Reconcile: only 1 and 3 are still visible upstream → 2 is tombstoned. + let report = store + .reconcile_source_tombstones("github", "o/r", &["1".to_string(), "3".to_string()]) + .unwrap(); + assert_eq!(report.considered, 3); + assert_eq!(report.tombstoned.len(), 1, "exactly issue 2 tombstoned"); + + // Issue 2's memory is invalidated (valid_until set) but NOT purged — + // content retained for audit, just no longer currently-valid. + let two = { + let reader = store.reader.lock().unwrap(); + reader + .query_row( + "SELECT id, valid_until FROM knowledge_nodes WHERE source_id = '2'", + [], + |r| Ok((r.get::<_, String>(0)?, r.get::<_, Option>(1)?)), + ) + .unwrap() + }; + assert!(two.1.is_some(), "tombstoned record must have valid_until set"); + let node = store.get_node(&two.0).unwrap().unwrap(); + assert!(!node.is_currently_valid(), "tombstoned node is not valid now"); + assert_eq!(node.content, "issue 2", "content retained for audit"); + + // A reappearing record un-tombstones on next upsert (clears valid_until). + store + .upsert_by_source(source_input("2", "issue 2", "h2")) + .unwrap(); + let revived = store.get_node(&two.0).unwrap().unwrap(); + assert!(revived.is_currently_valid(), "re-synced record is valid again"); + } + + #[test] + fn upsert_clears_superseded_by_when_record_reappears() { + // Regression: un-tombstoning must clear BOTH bitemporal markers. A + // connector node that was superseded/merged (valid_until + superseded_by + // both set) and then re-observed upstream must come back fully clean, + // otherwise it is currently-valid yet still flagged superseded and is + // permanently excluded from merge candidacy. + let store = create_test_storage(); + let created = store + .upsert_by_source(source_input("9", "body v1", "h9a")) + .unwrap(); + + // Simulate the node having been superseded (as merge/supersede would). + { + let writer = store.writer.lock().unwrap(); + writer + .execute( + "UPDATE knowledge_nodes SET valid_until = ?1, superseded_by = 'survivor-id' WHERE id = ?2", + params![Utc::now().to_rfc3339(), created.node_id], + ) + .unwrap(); + } + assert!( + store.superseded_node_ids().unwrap().contains(&created.node_id), + "precondition: node is superseded" + ); + + // Re-sync with a content change → Updated branch must clear both markers. + let res = store + .upsert_by_source(source_input("9", "body v2 edited", "h9b")) + .unwrap(); + assert_eq!(res.outcome, SourceUpsertOutcome::Updated); + assert!( + !store.superseded_node_ids().unwrap().contains(&created.node_id), + "superseded_by must be cleared on re-sync (no bitemporal zombie)" + ); + let node = store.get_node(&created.node_id).unwrap().unwrap(); + assert!(node.is_currently_valid()); + + // Also exercise the Unchanged branch: supersede again, re-sync same hash. + { + let writer = store.writer.lock().unwrap(); + writer + .execute( + "UPDATE knowledge_nodes SET valid_until = ?1, superseded_by = 'survivor-id' WHERE id = ?2", + params![Utc::now().to_rfc3339(), created.node_id], + ) + .unwrap(); + } + let res2 = store + .upsert_by_source(source_input("9", "body v2 edited", "h9b")) + .unwrap(); + assert_eq!(res2.outcome, SourceUpsertOutcome::Unchanged); + assert!( + !store.superseded_node_ids().unwrap().contains(&created.node_id), + "Unchanged branch must also clear superseded_by" + ); + } + #[cfg(all(feature = "embeddings", feature = "vector-search"))] fn with_vector_search_disabled(f: impl FnOnce() -> T) -> T { let _guard = ENV_LOCK.lock().unwrap(); diff --git a/crates/vestige-mcp/Cargo.toml b/crates/vestige-mcp/Cargo.toml index bc08a40..fb2a287 100644 --- a/crates/vestige-mcp/Cargo.toml +++ b/crates/vestige-mcp/Cargo.toml @@ -10,9 +10,13 @@ categories = ["command-line-utilities", "database"] repository = "https://github.com/samvallad33/vestige" [features] -default = ["embeddings", "ort-download", "vector-search"] +default = ["embeddings", "ort-download", "vector-search", "connectors"] embeddings = ["vestige-core/embeddings"] vector-search = ["vestige-core/vector-search"] +# External-source connectors (#57): GitHub Issues / Redmine indexing via the +# `source_sync` MCP tool. On by default so the tool works out of the box; turn +# off for a build with no HTTP client. +connectors = ["vestige-core/connectors"] # Default ort backend: downloads prebuilt ONNX Runtime at build time. # Fails on targets without prebuilts (notably x86_64-apple-darwin). ort-download = ["embeddings", "vestige-core/ort-download"] diff --git a/crates/vestige-mcp/src/bin/cli.rs b/crates/vestige-mcp/src/bin/cli.rs index 3daa8e3..b79dd55 100644 --- a/crates/vestige-mcp/src/bin/cli.rs +++ b/crates/vestige-mcp/src/bin/cli.rs @@ -1817,6 +1817,7 @@ fn run_restore(backup_path: PathBuf) -> anyhow::Result<()> { tags: memory.tags.unwrap_or_default(), valid_from: None, valid_until: None, + source_envelope: None, }; match storage.ingest(input) { @@ -2415,6 +2416,7 @@ fn run_ingest( tags: tag_list, valid_from: None, valid_until: None, + source_envelope: None, }; let storage = open_storage()?; diff --git a/crates/vestige-mcp/src/bin/restore.rs b/crates/vestige-mcp/src/bin/restore.rs index 332c329..ace6f3f 100644 --- a/crates/vestige-mcp/src/bin/restore.rs +++ b/crates/vestige-mcp/src/bin/restore.rs @@ -73,6 +73,7 @@ fn main() -> anyhow::Result<()> { tags: memory.tags.unwrap_or_default(), valid_from: None, valid_until: None, + source_envelope: None, }; match storage.ingest(input) { diff --git a/crates/vestige-mcp/src/cognitive.rs b/crates/vestige-mcp/src/cognitive.rs index 3d106ff..fc73f97 100644 --- a/crates/vestige-mcp/src/cognitive.rs +++ b/crates/vestige-mcp/src/cognitive.rs @@ -195,6 +195,7 @@ mod tests { tags: vec!["test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); result.id diff --git a/crates/vestige-mcp/src/server.rs b/crates/vestige-mcp/src/server.rs index 7682441..b8f90e7 100644 --- a/crates/vestige-mcp/src/server.rs +++ b/crates/vestige-mcp/src/server.rs @@ -281,6 +281,15 @@ impl McpServer { ..Default::default() }, // ================================================================ + // EXTERNAL-SOURCE CONNECTORS (#57) + // ================================================================ + ToolDescription { + name: "source_sync".to_string(), + description: Some("Index an external system (GitHub Issues) into Vestige as a durable, offline, semantically-searchable index that cites back to the canonical record. Provide 'repo' as 'owner/name'. Idempotent: re-running updates changed issues without duplicating; set reconcile=true to tombstone issues removed upstream. Auth via the GITHUB_TOKEN env var (optional for public repos).".to_string()), + input_schema: tools::source_sync::schema(), + ..Default::default() + }, + // ================================================================ // TEMPORAL TOOLS (v1.2+) // ================================================================ ToolDescription { @@ -593,6 +602,11 @@ impl McpServer { .await } + // ================================================================ + // External-source connectors (#57) + // ================================================================ + "source_sync" => tools::source_sync::execute(&self.storage, request.arguments).await, + // ================================================================ // DEPRECATED (v1.7): ingest → smart_ingest // ================================================================ @@ -1806,10 +1820,10 @@ mod tests { let result = response.result.unwrap(); let tools = result["tools"].as_array().unwrap(); - // 33 tools: 25 from v2.1.21 + 7 Phase 3 merge/supersede tools: - // merge_candidates, plan_merge, plan_supersede, apply_plan, merge_undo, - // protect, merge_policy, composed_graph) - assert_eq!(tools.len(), 33, "Expected exactly 33 tools"); + // 34 tools: 25 from v2.1.21 + 7 Phase 3 merge/supersede tools + // (merge_candidates, plan_merge, plan_supersede, apply_plan, merge_undo, + // protect, merge_policy, composed_graph) + 1 connector tool (source_sync, #57). + assert_eq!(tools.len(), 34, "Expected exactly 34 tools"); let tool_names: Vec<&str> = tools.iter().map(|t| t["name"].as_str().unwrap()).collect(); @@ -1821,6 +1835,9 @@ mod tests { // Core memory (smart_ingest absorbs ingest + checkpoint in v1.7) assert!(tool_names.contains(&"smart_ingest")); + + // External-source connectors (#57) + assert!(tool_names.contains(&"source_sync")); assert!( !tool_names.contains(&"ingest"), "ingest should be removed in v1.7" diff --git a/crates/vestige-mcp/src/tools/changelog.rs b/crates/vestige-mcp/src/tools/changelog.rs index 8a2e746..eb47139 100644 --- a/crates/vestige-mcp/src/tools/changelog.rs +++ b/crates/vestige-mcp/src/tools/changelog.rs @@ -278,6 +278,7 @@ mod tests { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); node.id diff --git a/crates/vestige-mcp/src/tools/codebase_unified.rs b/crates/vestige-mcp/src/tools/codebase_unified.rs index e24021d..50491cd 100644 --- a/crates/vestige-mcp/src/tools/codebase_unified.rs +++ b/crates/vestige-mcp/src/tools/codebase_unified.rs @@ -154,6 +154,7 @@ async fn execute_remember_pattern( tags, valid_from: None, valid_until: None, + source_envelope: None, }; let node = storage.ingest(input).map_err(|e| e.to_string())?; @@ -250,6 +251,7 @@ async fn execute_remember_decision( tags, valid_from: None, valid_until: None, + source_envelope: None, }; let node = storage.ingest(input).map_err(|e| e.to_string())?; diff --git a/crates/vestige-mcp/src/tools/cross_reference.rs b/crates/vestige-mcp/src/tools/cross_reference.rs index e48b4eb..0e231b1 100644 --- a/crates/vestige-mcp/src/tools/cross_reference.rs +++ b/crates/vestige-mcp/src/tools/cross_reference.rs @@ -1119,6 +1119,7 @@ mod tests { tags: tags.iter().map(|s| s.to_string()).collect(), valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap() .id diff --git a/crates/vestige-mcp/src/tools/dream.rs b/crates/vestige-mcp/src/tools/dream.rs index 65a373b..1456406 100644 --- a/crates/vestige-mcp/src/tools/dream.rs +++ b/crates/vestige-mcp/src/tools/dream.rs @@ -283,6 +283,7 @@ mod tests { tags: vec!["dream-test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } @@ -420,6 +421,7 @@ mod tests { tags: vec!["dream-roundtrip".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } @@ -485,6 +487,7 @@ mod tests { tags: vec!["save-conn-test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); ids.push(result.id); @@ -588,6 +591,7 @@ mod tests { tags: tags.iter().map(|t| t.to_string()).collect(), valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } @@ -713,6 +717,7 @@ mod tests { tags: tags.iter().map(|t| t.to_string()).collect(), valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } diff --git a/crates/vestige-mcp/src/tools/explore.rs b/crates/vestige-mcp/src/tools/explore.rs index 441afd3..cabc7c9 100644 --- a/crates/vestige-mcp/src/tools/explore.rs +++ b/crates/vestige-mcp/src/tools/explore.rs @@ -414,6 +414,7 @@ mod tests { tags: vec!["test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap() .id; @@ -428,6 +429,7 @@ mod tests { tags: vec!["test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap() .id; @@ -478,6 +480,7 @@ mod tests { tags: vec!["test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }; let id_a = storage.ingest(make("Memory A about databases")).unwrap().id; let id_b = storage.ingest(make("Memory B about indexes")).unwrap().id; @@ -529,6 +532,7 @@ mod tests { tags: vec!["test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }; let id_a = storage.ingest(make("Bridge test memory A")).unwrap().id; let id_b = storage.ingest(make("Bridge test memory B")).unwrap().id; diff --git a/crates/vestige-mcp/src/tools/feedback.rs b/crates/vestige-mcp/src/tools/feedback.rs index 438e594..a236bf2 100644 --- a/crates/vestige-mcp/src/tools/feedback.rs +++ b/crates/vestige-mcp/src/tools/feedback.rs @@ -313,6 +313,7 @@ mod tests { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); node.id @@ -556,6 +557,7 @@ mod tests { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); let node_id = node.id.clone(); diff --git a/crates/vestige-mcp/src/tools/graph.rs b/crates/vestige-mcp/src/tools/graph.rs index 13ca746..904e5e4 100644 --- a/crates/vestige-mcp/src/tools/graph.rs +++ b/crates/vestige-mcp/src/tools/graph.rs @@ -328,6 +328,7 @@ mod tests { tags: vec!["test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); @@ -355,6 +356,7 @@ mod tests { tags: vec!["science".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); @@ -378,6 +380,7 @@ mod tests { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); diff --git a/crates/vestige-mcp/src/tools/health.rs b/crates/vestige-mcp/src/tools/health.rs index 362f298..563abb8 100644 --- a/crates/vestige-mcp/src/tools/health.rs +++ b/crates/vestige-mcp/src/tools/health.rs @@ -119,6 +119,7 @@ mod tests { tags: vec!["test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } @@ -144,6 +145,7 @@ mod tests { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); diff --git a/crates/vestige-mcp/src/tools/maintenance.rs b/crates/vestige-mcp/src/tools/maintenance.rs index ef1a927..814f513 100644 --- a/crates/vestige-mcp/src/tools/maintenance.rs +++ b/crates/vestige-mcp/src/tools/maintenance.rs @@ -778,6 +778,7 @@ mod tests { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } @@ -832,6 +833,7 @@ mod tests { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } @@ -904,6 +906,7 @@ mod tests { tags: vec!["schema-test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); @@ -1015,6 +1018,7 @@ mod tests { tags: vec!["portable".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); diff --git a/crates/vestige-mcp/src/tools/memory_unified.rs b/crates/vestige-mcp/src/tools/memory_unified.rs index 8a9ddcf..1b32262 100644 --- a/crates/vestige-mcp/src/tools/memory_unified.rs +++ b/crates/vestige-mcp/src/tools/memory_unified.rs @@ -552,6 +552,7 @@ mod tests { tags: vec!["test-tag".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); node.id diff --git a/crates/vestige-mcp/src/tools/mod.rs b/crates/vestige-mcp/src/tools/mod.rs index 078fab6..f145caf 100644 --- a/crates/vestige-mcp/src/tools/mod.rs +++ b/crates/vestige-mcp/src/tools/mod.rs @@ -12,6 +12,8 @@ pub mod intention_unified; pub mod memory_unified; pub mod search_unified; pub mod smart_ingest; +// #57: external-source connectors (GitHub Issues / Redmine retrieval layer) +pub mod source_sync; // v1.2: Temporal query tools pub mod changelog; diff --git a/crates/vestige-mcp/src/tools/restore.rs b/crates/vestige-mcp/src/tools/restore.rs index ac7184c..f000397 100644 --- a/crates/vestige-mcp/src/tools/restore.rs +++ b/crates/vestige-mcp/src/tools/restore.rs @@ -173,6 +173,7 @@ pub async fn execute(storage: &Arc, args: Option) -> Result Value { + let Some(env) = node.source_envelope.as_ref() else { + return Value::Null; + }; + serde_json::json!({ + "system": env.source_system, + "id": env.source_id, + "url": env.source_url, + "project": env.source_project, + "type": env.source_type, + "author": env.source_author, + "sourceUpdatedAt": env.source_updated_at.map(|dt| dt.to_rfc3339()), + "syncedAt": env.synced_at.map(|dt| dt.to_rfc3339()), + // A tombstoned (no-longer-visible) record has valid_until set in the past. + "tombstoned": !node.is_currently_valid(), + }) +} + fn format_search_result(r: &vestige_core::SearchResult, detail_level: &str) -> Value { match detail_level { "brief" => serde_json::json!({ @@ -898,45 +923,65 @@ fn format_search_result(r: &vestige_core::SearchResult, detail_level: &str) -> V "retentionStrength": r.node.retention_strength, "combinedScore": r.combined_score, }), - "full" => serde_json::json!({ - "id": r.node.id, - "content": r.node.content, - "combinedScore": r.combined_score, - "keywordScore": r.keyword_score, - "semanticScore": r.semantic_score, - "nodeType": r.node.node_type, - "tags": r.node.tags, - "retentionStrength": r.node.retention_strength, - "storageStrength": r.node.storage_strength, - "retrievalStrength": r.node.retrieval_strength, - "source": r.node.source, - "sentimentScore": r.node.sentiment_score, - "sentimentMagnitude": r.node.sentiment_magnitude, - "createdAt": r.node.created_at.to_rfc3339(), - "updatedAt": r.node.updated_at.to_rfc3339(), - "lastAccessed": r.node.last_accessed.to_rfc3339(), - "nextReview": r.node.next_review.map(|dt| dt.to_rfc3339()), - "stability": r.node.stability, - "difficulty": r.node.difficulty, - "reps": r.node.reps, - "lapses": r.node.lapses, - "validFrom": r.node.valid_from.map(|dt| dt.to_rfc3339()), - "validUntil": r.node.valid_until.map(|dt| dt.to_rfc3339()), - "matchType": format!("{:?}", r.match_type), - }), + "full" => { + let mut v = serde_json::json!({ + "id": r.node.id, + "content": r.node.content, + "combinedScore": r.combined_score, + "keywordScore": r.keyword_score, + "semanticScore": r.semantic_score, + "nodeType": r.node.node_type, + "tags": r.node.tags, + "retentionStrength": r.node.retention_strength, + "storageStrength": r.node.storage_strength, + "retrievalStrength": r.node.retrieval_strength, + "source": r.node.source, + "sentimentScore": r.node.sentiment_score, + "sentimentMagnitude": r.node.sentiment_magnitude, + "createdAt": r.node.created_at.to_rfc3339(), + "updatedAt": r.node.updated_at.to_rfc3339(), + "lastAccessed": r.node.last_accessed.to_rfc3339(), + "nextReview": r.node.next_review.map(|dt| dt.to_rfc3339()), + "stability": r.node.stability, + "difficulty": r.node.difficulty, + "reps": r.node.reps, + "lapses": r.node.lapses, + "validFrom": r.node.valid_from.map(|dt| dt.to_rfc3339()), + "validUntil": r.node.valid_until.map(|dt| dt.to_rfc3339()), + "matchType": format!("{:?}", r.match_type), + }); + attach_source_record(&mut v, &r.node); + v + } // "summary" (default) — includes dates so AI never has to guess when a memory is from - _ => serde_json::json!({ - "id": r.node.id, - "content": r.node.content, - "combinedScore": r.combined_score, - "keywordScore": r.keyword_score, - "semanticScore": r.semantic_score, - "nodeType": r.node.node_type, - "tags": r.node.tags, - "retentionStrength": r.node.retention_strength, - "createdAt": r.node.created_at.to_rfc3339(), - "updatedAt": r.node.updated_at.to_rfc3339(), - }), + _ => { + let mut v = serde_json::json!({ + "id": r.node.id, + "content": r.node.content, + "combinedScore": r.combined_score, + "keywordScore": r.keyword_score, + "semanticScore": r.semantic_score, + "nodeType": r.node.node_type, + "tags": r.node.tags, + "retentionStrength": r.node.retention_strength, + "createdAt": r.node.created_at.to_rfc3339(), + "updatedAt": r.node.updated_at.to_rfc3339(), + }); + attach_source_record(&mut v, &r.node); + v + } + } +} + +/// Inject a `sourceRecord` object into a result `Value` ONLY when the memory +/// has external provenance, so legacy (agent/user-authored) memories keep their +/// exact prior result shape. +fn attach_source_record(value: &mut Value, node: &vestige_core::KnowledgeNode) { + let provenance = source_provenance(node); + if !provenance.is_null() + && let Value::Object(map) = value + { + map.insert("sourceRecord".to_string(), provenance); } } @@ -950,36 +995,44 @@ pub fn format_node(node: &vestige_core::KnowledgeNode, detail_level: &str) -> Va "tags": node.tags, "retentionStrength": node.retention_strength, }), - "full" => serde_json::json!({ - "id": node.id, - "content": node.content, - "nodeType": node.node_type, - "tags": node.tags, - "retentionStrength": node.retention_strength, - "storageStrength": node.storage_strength, - "retrievalStrength": node.retrieval_strength, - "source": node.source, - "sentimentScore": node.sentiment_score, - "sentimentMagnitude": node.sentiment_magnitude, - "createdAt": node.created_at.to_rfc3339(), - "updatedAt": node.updated_at.to_rfc3339(), - "lastAccessed": node.last_accessed.to_rfc3339(), - "nextReview": node.next_review.map(|dt| dt.to_rfc3339()), - "stability": node.stability, - "difficulty": node.difficulty, - "reps": node.reps, - "lapses": node.lapses, - "validFrom": node.valid_from.map(|dt| dt.to_rfc3339()), - "validUntil": node.valid_until.map(|dt| dt.to_rfc3339()), - }), + "full" => { + let mut v = serde_json::json!({ + "id": node.id, + "content": node.content, + "nodeType": node.node_type, + "tags": node.tags, + "retentionStrength": node.retention_strength, + "storageStrength": node.storage_strength, + "retrievalStrength": node.retrieval_strength, + "source": node.source, + "sentimentScore": node.sentiment_score, + "sentimentMagnitude": node.sentiment_magnitude, + "createdAt": node.created_at.to_rfc3339(), + "updatedAt": node.updated_at.to_rfc3339(), + "lastAccessed": node.last_accessed.to_rfc3339(), + "nextReview": node.next_review.map(|dt| dt.to_rfc3339()), + "stability": node.stability, + "difficulty": node.difficulty, + "reps": node.reps, + "lapses": node.lapses, + "validFrom": node.valid_from.map(|dt| dt.to_rfc3339()), + "validUntil": node.valid_until.map(|dt| dt.to_rfc3339()), + }); + attach_source_record(&mut v, node); + v + } // "summary" (default) - _ => serde_json::json!({ - "id": node.id, - "content": node.content, - "nodeType": node.node_type, - "tags": node.tags, - "retentionStrength": node.retention_strength, - }), + _ => { + let mut v = serde_json::json!({ + "id": node.id, + "content": node.content, + "nodeType": node.node_type, + "tags": node.tags, + "retentionStrength": node.retention_strength, + }); + attach_source_record(&mut v, node); + v + } } } @@ -1016,6 +1069,7 @@ mod tests { tags: vec![], valid_from: None, valid_until: None, + source_envelope: None, }; let node = storage.ingest(input).unwrap(); node.id @@ -1839,6 +1893,7 @@ mod tests { tags: tags.into_iter().map(String::from).collect(), valid_from: None, valid_until: None, + source_envelope: None, }; let node = storage.ingest(input).unwrap(); node.id diff --git a/crates/vestige-mcp/src/tools/session_context.rs b/crates/vestige-mcp/src/tools/session_context.rs index 97e52ff..3f17994 100644 --- a/crates/vestige-mcp/src/tools/session_context.rs +++ b/crates/vestige-mcp/src/tools/session_context.rs @@ -506,6 +506,7 @@ mod tests { tags: tags.into_iter().map(|s| s.to_string()).collect(), valid_from: None, valid_until: None, + source_envelope: None, }; let node = storage.ingest(input).unwrap(); node.id @@ -712,6 +713,7 @@ mod tests { tags: vec!["pattern".to_string(), "codebase:vestige".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }; storage.ingest(input).unwrap(); diff --git a/crates/vestige-mcp/src/tools/smart_ingest.rs b/crates/vestige-mcp/src/tools/smart_ingest.rs index c0b447d..60828fc 100644 --- a/crates/vestige-mcp/src/tools/smart_ingest.rs +++ b/crates/vestige-mcp/src/tools/smart_ingest.rs @@ -215,6 +215,7 @@ pub async fn execute( tags, valid_from: None, valid_until: None, + source_envelope: None, }; // ==================================================================== @@ -414,6 +415,7 @@ async fn execute_batch( tags, valid_from: None, valid_until: None, + source_envelope: None, }; // ================================================================ diff --git a/crates/vestige-mcp/src/tools/source_sync.rs b/crates/vestige-mcp/src/tools/source_sync.rs new file mode 100644 index 0000000..bfdd4b2 --- /dev/null +++ b/crates/vestige-mcp/src/tools/source_sync.rs @@ -0,0 +1,187 @@ +//! `source_sync` MCP tool (#57) — index an external system into Vestige. +//! +//! Turns Vestige into a durable, offline, provenance-linked retrieval layer +//! over a long-lived external system. The first connector is GitHub Issues: +//! point it at `owner/repo` and Vestige indexes every issue + its comments as +//! source-aware memories you can search semantically and cite back to the +//! canonical issue URL — re-runnable idempotently (no duplicates) and able to +//! tombstone issues that vanish upstream. +//! +//! Unlike the official GitHub MCP server (a stateless live API proxy), this +//! keeps a local index: searchable offline, embedded for semantic recall, +//! joinable with the rest of your memory, and temporally versioned. +//! +//! ## Auth (security) +//! +//! The GitHub token is read from the `GITHUB_TOKEN` (or `VESTIGE_GITHUB_TOKEN`) +//! environment variable, never from tool arguments, so credentials are not +//! logged in the conversation. Public repositories work without a token at a +//! lower rate limit. + +use std::sync::Arc; + +use serde::Deserialize; +use serde_json::{Value, json}; + +use vestige_core::storage::Storage; + +/// JSON schema for the `source_sync` tool. +pub fn schema() -> Value { + json!({ + "type": "object", + "properties": { + "source": { + "type": "string", + "enum": ["github"], + "description": "External system to sync. Currently: 'github' (GitHub Issues).", + "default": "github" + }, + "repo": { + "type": "string", + "description": "GitHub repository as 'owner/name', e.g. 'samvallad33/vestige'." + }, + "reconcile": { + "type": "boolean", + "description": "Also tombstone local memories for issues no longer visible upstream (an extra full enumeration pass). Default false on incremental syncs.", + "default": false + }, + "max_pages": { + "type": "integer", + "description": "Max API pages to fetch this run (each page is up to 100 issues). Lets a first sync of a large repo be resumed across calls. Default 10.", + "default": 10, + "minimum": 1, + "maximum": 1000 + } + }, + "required": ["repo"] + }) +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct SourceSyncArgs { + #[serde(default = "default_source")] + source: String, + repo: String, + #[serde(default)] + reconcile: bool, + #[serde(default, alias = "max_pages")] + max_pages: Option, +} + +fn default_source() -> String { + "github".to_string() +} + +/// Read the GitHub token from the environment (never from tool args). +fn github_token() -> Option { + std::env::var("GITHUB_TOKEN") + .or_else(|_| std::env::var("VESTIGE_GITHUB_TOKEN")) + .ok() + .filter(|s| !s.trim().is_empty()) +} + +pub async fn execute(storage: &Arc, args: Option) -> Result { + let args: SourceSyncArgs = match args { + Some(v) => serde_json::from_value(v).map_err(|e| format!("Invalid arguments: {e}"))?, + None => return Err("Missing arguments".to_string()), + }; + + if args.source != "github" { + return Err(format!( + "Unsupported source '{}'. Currently only 'github' is supported.", + args.source + )); + } + + let (owner, repo) = args + .repo + .split_once('/') + .filter(|(o, r)| !o.is_empty() && !r.is_empty()) + .ok_or_else(|| { + "repo must be in 'owner/name' form, e.g. 'samvallad33/vestige'".to_string() + })?; + + execute_github( + storage, + owner, + repo, + args.reconcile, + args.max_pages.unwrap_or(10), + ) + .await +} + +/// Connectors are feature-gated; surface a clear message when the build omits +/// them rather than failing obscurely. +#[cfg(not(feature = "connectors"))] +async fn execute_github( + _storage: &Arc, + _owner: &str, + _repo: &str, + _reconcile: bool, + _max_pages: usize, +) -> Result { + Err("This Vestige build was compiled without the 'connectors' feature. \ + Rebuild with --features connectors to enable source_sync." + .to_string()) +} + +#[cfg(feature = "connectors")] +async fn execute_github( + storage: &Arc, + owner: &str, + repo: &str, + reconcile: bool, + max_pages: usize, +) -> Result { + use vestige_core::connectors::github::{GithubConfig, GithubConnector}; + use vestige_core::connectors::run_sync; + + let config = GithubConfig::new(owner, repo).with_token(github_token()); + let connector = + GithubConnector::new(config).map_err(|e| format!("connector init failed: {e}"))?; + + let report = run_sync(storage.as_ref(), &connector, reconcile, max_pages) + .await + .map_err(|e| format!("sync failed: {e}"))?; + + let scope = format!("{owner}/{repo}"); + let total = report.created + report.updated + report.unchanged; + let authed = github_token().is_some(); + + let summary = format!( + "Synced {scope}: {} created, {} updated, {} unchanged{} ({total} records seen{}).", + report.created, + report.updated, + report.unchanged, + if report.reconciled { + format!(", {} tombstoned", report.tombstoned) + } else { + String::new() + }, + if authed { "" } else { ", unauthenticated" }, + ); + + Ok(json!({ + "ok": true, + "summary": summary, + "source": "github", + "scope": scope, + "created": report.created, + "updated": report.updated, + "unchanged": report.unchanged, + "tombstoned": report.tombstoned, + "reconciled": report.reconciled, + "cursor": report.new_cursor.map(|d| d.to_rfc3339()), + "authenticated": authed, + "warnings": report.warnings, + "hint": if total == 0 && !authed { + "No records returned. For private repos or higher rate limits, set GITHUB_TOKEN in the server environment." + } else if report.new_cursor.is_some() && total >= 100 { + "More may remain — run source_sync again to continue from the saved cursor." + } else { + "Search these with the normal search tools; results cite the GitHub issue URL." + } + })) +} diff --git a/crates/vestige-mcp/src/tools/suppress.rs b/crates/vestige-mcp/src/tools/suppress.rs index f06debc..342793e 100644 --- a/crates/vestige-mcp/src/tools/suppress.rs +++ b/crates/vestige-mcp/src/tools/suppress.rs @@ -177,6 +177,7 @@ mod tests { tags: vec!["test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap() .id diff --git a/crates/vestige-mcp/src/tools/timeline.rs b/crates/vestige-mcp/src/tools/timeline.rs index 7b3dcbf..937ace7 100644 --- a/crates/vestige-mcp/src/tools/timeline.rs +++ b/crates/vestige-mcp/src/tools/timeline.rs @@ -209,6 +209,7 @@ mod tests { tags: vec!["timeline-test".to_string()], valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } @@ -226,6 +227,7 @@ mod tests { tags: tags.iter().map(|t| t.to_string()).collect(), valid_from: None, valid_until: None, + source_envelope: None, }) .unwrap(); } diff --git a/docs/CONNECTORS.md b/docs/CONNECTORS.md new file mode 100644 index 0000000..8bd561a --- /dev/null +++ b/docs/CONNECTORS.md @@ -0,0 +1,150 @@ +# External-Source Connectors + +> Status: **v2.1.27** — GitHub Issues connector (reference). Redmine and others +> follow the same contract. Tracking issue: +> [#57](https://github.com/samvallad33/vestige/issues/57). + +Connectors let Vestige act as a durable, local **retrieval and reasoning layer** +over a long-lived external system — a ticket tracker, an issue board, a support +queue — **without replacing it**. The external system stays the source of truth. +Vestige indexes its records, embeds them for semantic recall, links them into the +memory graph, and **cites back** to the canonical record. + +## Why this is different from a ticket-system MCP + +The official GitHub / Jira MCP servers are **live API proxies**: every query hits +the upstream API, is rate-limited, keyword-only, online-only, and has no memory +of past state. Vestige instead keeps a **durable local index** of the records, so +you can: + +- search the history **offline** and **semantically** (embeddings, not just + keywords), +- **join** ticket history with the rest of your memory in one search, +- see a **point-in-time** view (records carry temporal validity), +- and re-sync **idempotently** — re-running never duplicates a record. + +## Quick start (GitHub Issues) + +1. (Optional but recommended) export a token so you get the authenticated rate + limit (5,000 req/hr vs 60 for anonymous) and access to private repos: + + ```sh + export GITHUB_TOKEN=ghp_xxx # or VESTIGE_GITHUB_TOKEN + ``` + + The token is read **only** from the environment — never passed as a tool + argument, never logged. + +2. Ask your agent to run the `source_sync` MCP tool: + + ```json + { "repo": "samvallad33/vestige" } + ``` + +3. Search as normal. Connector-sourced results carry a `sourceRecord` object with + the canonical issue URL: + + ```json + { + "content": "[samvallad33/vestige#57] Roadmap: external source connectors …", + "sourceRecord": { + "system": "github", + "id": "57", + "url": "https://github.com/samvallad33/vestige/issues/57", + "project": "samvallad33/vestige", + "type": "issue", + "author": "samvallad33", + "tombstoned": false + } + } + ``` + +## The `source_sync` tool + +| Field | Type | Default | Meaning | +|---|---|---|---| +| `repo` | string | — (required) | `owner/name`, e.g. `samvallad33/vestige`. | +| `source` | string | `github` | External system. Currently only `github`. | +| `reconcile` | bool | `false` | Also tombstone local memories for issues no longer visible upstream (an extra full-enumeration pass). | +| `max_pages` | int | `10` | API pages to fetch this run (≤100 issues each). Lets a first sync of a large repo resume across calls. | + +The tool returns counts (`created` / `updated` / `unchanged` / `tombstoned`), +the saved `cursor`, whether it ran authenticated, and a `hint` for the next step. + +### Idempotent, incremental sync + +Each run: + +1. resumes from the saved cursor (the high-water mark on the record's upstream + update time), minus a small overlap window so same-second / clock-skewed + updates are never missed; +2. pages issues in ascending update order (`state=all`, so closing an issue is + **not** mistaken for a deletion), folding each issue + its comments into one + memory; +3. routes each record through an **idempotent upsert** keyed on + `(source_system, source_id)`: + - unseen record → **insert**, + - changed content (by content hash) → **update in place** + re-embed, + - unchanged content → **no-op** (only the "last seen" time advances); +4. advances and persists the cursor only after the run, so an interruption + re-scans rather than skips. + +Re-running `source_sync` on the same repo is therefore safe and cheap — it picks +up only what changed. + +### Deletions (tombstoning) + +Neither GitHub nor Redmine exposes a deletion feed, so an incremental sync can +never *see* a delete. Pass `reconcile: true` to run a reconciliation pass: Vestige +enumerates the currently-visible issue ids and **invalidates** (does not purge) +any local record no longer present. A tombstoned record keeps its content for +audit but drops out of "currently valid" retrieval (`sourceRecord.tombstoned` is +`true`). If the record reappears upstream, the next sync un-tombstones it. + +## The source envelope + +Every connector-ingested memory carries structured provenance, distinct from the +legacy free-form `source` label: + +| Field | Purpose | +|---|---| +| `source_system` | `github`, `redmine`, … (namespaces ids). | +| `source_id` | Native id (issue number, ticket id). | +| `source_url` | Canonical link back — the citation. | +| `source_updated_at` | Upstream update time (the sync cursor field). | +| `content_hash` | Change detector → idempotency. | +| `synced_at` | When the connector last saw the record live. | +| `source_project` | Repo / project / space. | +| `source_type` | `issue`, `comment`, … | +| `source_author` | Reporter / author upstream. | + +`(source_system, source_id)` is enforced unique, so there is exactly one memory +per external record. Legacy memories (agent- or user-authored) have no envelope +and are completely unaffected. + +## Building + +The connector HTTP client is behind the `connectors` cargo feature, which is +**on by default in the MCP server** (`vestige-mcp`). A build without it still +exposes the `source_sync` tool but returns a clear "rebuild with `--features +connectors`" message. The core library (`vestige-core`) leaves the feature +**off** by default, so library consumers that don't need connectors link no HTTP +client. + +```sh +# default MCP build already includes connectors +cargo build -p vestige-mcp --release + +# explicit, or for the core lib +cargo build -p vestige-core --features connectors +``` + +## Writing a new connector + +Implement the `Connector` trait in `vestige_core::connectors` (fetch a window of +records updated since a cursor, page forward, and optionally enumerate live ids +for reconciliation), produce `NormalizedRecord`s with a filled +`SourceEnvelope`, and hand them to `run_sync`. The GitHub connector +(`crates/vestige-core/src/connectors/github.rs`) is the reference +implementation. The sync driver, idempotent upsert, cursor checkpointing, and +tombstone reconciliation are all reused for free. diff --git a/tests/e2e/src/harness/db_manager.rs b/tests/e2e/src/harness/db_manager.rs index 345a94c..268432c 100644 --- a/tests/e2e/src/harness/db_manager.rs +++ b/tests/e2e/src/harness/db_manager.rs @@ -31,6 +31,7 @@ fn make_ingest_input( source, valid_from, valid_until, + source_envelope: None, } } diff --git a/tests/e2e/src/mocks/fixtures.rs b/tests/e2e/src/mocks/fixtures.rs index 6929e56..87d786b 100644 --- a/tests/e2e/src/mocks/fixtures.rs +++ b/tests/e2e/src/mocks/fixtures.rs @@ -29,6 +29,7 @@ fn make_ingest_input( source, valid_from, valid_until, + source_envelope: None, } }