feat(connectors): external-source connector layer + GitHub Issues (#57)

Make Vestige a durable, local, semantically-searchable retrieval layer over an
external system of record (GitHub Issues first), citing back to the canonical
record. Unlike a live ticket-system MCP proxy, Vestige keeps a durable embedded
index: searchable offline, joinable with the rest of memory, temporally
versioned, and re-syncable idempotently with no duplication.

Phases 1-2 of #57 plus a GitHub reference connector and source-aware search:

- Source envelope on KnowledgeNode/IngestInput (source_system, source_id,
  source_url, source_updated_at, content_hash, synced_at, source_project,
  source_type, source_author). Migration V17: nullable columns (additive),
  partial UNIQUE index on (source_system, source_id), connector_cursors table.
- Idempotent sync primitives in vestige-core: upsert_by_source (content-hash
  change detection), connector cursor checkpoints, reconcile_source_tombstones
  (invalidate-don't-delete via bitemporal valid_until).
- Connector contract + run_sync driver + GitHub Issues connector behind the
  optional `connectors` feature (on by default in vestige-mcp, off in the core
  library default so non-connector consumers link no HTTP client).
- source_sync MCP tool ({"repo": "owner/name"}); token from GITHUB_TOKEN env
  only. Search results gain a sourceRecord citation for connector memories.

Adversarial review fixes: GitHub `since` Z-form (the `+00:00` offset corrupted
the cursor server-side), un-tombstone clears superseded_by too, cursor never
advances past a failing record, Link next-url host-pinned (token-leak guard),
records_seen counts new records only.

Verified: cargo check/test/clippy -D warnings green across the workspace
(default and connectors features); 483 core tests pass. Version bump to 2.1.27
and tag deferred to release.

Refs #57

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sam Valladares 2026-06-19 01:21:59 -05:00
parent 22d0d192eb
commit 50e7f2d0fb
39 changed files with 2538 additions and 85 deletions

View file

@ -5,7 +5,63 @@ All notable changes to Vestige will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [Unreleased] — "External-Source Connectors"
> Bump `version` in the workspace `Cargo.toml`, both crates, `server.json`, and
> `package.json` to `2.1.27` at release/tag time, and date this heading.
Roadmap [#57](https://github.com/samvallad33/vestige/issues/57), **Phase 13**:
Vestige can now act as a durable, local, semantically-searchable retrieval layer
over an external system of record — starting with GitHub Issues — without
replacing it. The external system stays canonical; Vestige **indexes, connects,
retrieves, and cites back** to the source record.
Unlike a live ticket-system MCP proxy (which holds no state and is rate-limited
per query), Vestige keeps a durable embedded index: searchable **offline**,
**semantically**, joinable with the rest of your memory, temporally versioned,
and re-syncable **idempotently** with no duplication. To our knowledge no other
local-first memory layer combines native connectors, external-URL provenance,
content-hash idempotent sync, and tombstoning of vanished records.
### Added
- **`source_sync` MCP tool** — point Vestige at a GitHub repo
(`{"repo": "owner/name"}`) and it indexes every issue + its comments as
source-aware memories. Re-running updates changed issues in place (no
duplicates); `reconcile: true` tombstones issues no longer visible upstream.
Auth via the `GITHUB_TOKEN` (or `VESTIGE_GITHUB_TOKEN`) environment variable;
public repos work without a token at a lower rate limit.
- **Source envelope** on every memory — structured, machine-readable provenance
(`source_system`, `source_id`, `source_url`, `source_updated_at`,
`content_hash`, `synced_at`, `source_project`, `source_type`, `source_author`)
distinct from the legacy free-form `source` label. Search results gain a
`sourceRecord` object (with the canonical `url`) **only** for
connector-ingested memories, so an agent can cite and follow the source.
- **Idempotent sync primitives** (`vestige-core`): `upsert_by_source` (keyed on
`(source_system, source_id)`, content-hash change detection), per-connector
cursor checkpoints (`connector_cursors`), and `reconcile_source_tombstones`
(invalidate-don't-delete via the bitemporal `valid_until`, so a vanished
record is retained for audit but drops out of current retrieval).
- **Connector contract** (`vestige_core::connectors`) — a small source-agnostic
`Connector` trait + `run_sync` driver (cursor overlap window, incremental
paging, optional deletion reconcile) and a GitHub Issues reference connector
behind the optional `connectors` cargo feature (on by default in the MCP
server, off in the core library's default features so non-connector consumers
link no HTTP client).
### Database
- **Migration V17** — nine nullable source-envelope columns on `knowledge_nodes`
(additive; every existing memory is untouched), a partial UNIQUE index on
`(source_system, source_id)` enforcing one memory per external record while
costing nothing for envelope-less legacy rows, and the `connector_cursors`
checkpoint table. Idempotent on replay, following the established
`add_column_if_missing` pattern.
### Notes
- Local-first and optional: with no `source_sync` call, behavior is unchanged.
The default core-library build does not link an HTTP client.
## [2.1.26] - 2026-06-15 — "Configurable Output"

99
Cargo.lock generated
View file

@ -521,6 +521,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "cfg_aliases"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chrono"
version = "0.4.44"
@ -1681,8 +1687,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@ -1692,9 +1700,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi 5.3.0",
"wasip2",
"wasm-bindgen",
]
[[package]]
@ -1932,6 +1942,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tower-service",
"webpki-roots 1.0.6",
]
[[package]]
@ -2519,6 +2530,12 @@ dependencies = [
"hashbrown 0.16.1",
]
[[package]]
name = "lru-slab"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "lzma-rust2"
version = "0.15.7"
@ -3335,6 +3352,61 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
[[package]]
name = "quinn"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20"
dependencies = [
"bytes",
"cfg_aliases",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls",
"socket2",
"thiserror 2.0.18",
"tokio",
"tracing",
"web-time",
]
[[package]]
name = "quinn-proto"
version = "0.11.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098"
dependencies = [
"bytes",
"getrandom 0.3.4",
"lru-slab",
"rand",
"ring",
"rustc-hash",
"rustls",
"rustls-pki-types",
"slab",
"thiserror 2.0.18",
"tinyvec",
"tracing",
"web-time",
]
[[package]]
name = "quinn-udp"
version = "0.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd"
dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2",
"tracing",
"windows-sys 0.52.0",
]
[[package]]
name = "quote"
version = "1.0.45"
@ -3571,6 +3643,8 @@ dependencies = [
"native-tls",
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls",
"rustls-pki-types",
"serde",
"serde_json",
@ -3578,6 +3652,7 @@ dependencies = [
"sync_wrapper",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower",
"tower-http",
@ -3587,6 +3662,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 1.0.6",
]
[[package]]
@ -3636,6 +3712,12 @@ dependencies = [
"sqlite-wasm-rs",
]
[[package]]
name = "rustc-hash"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe"
[[package]]
name = "rustix"
version = "1.1.4"
@ -3670,6 +3752,7 @@ version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd"
dependencies = [
"web-time",
"zeroize",
]
@ -4162,6 +4245,21 @@ dependencies = [
"serde_json",
]
[[package]]
name = "tinyvec"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokenizers"
version = "0.22.2"
@ -4684,6 +4782,7 @@ dependencies = [
"git2",
"lru",
"notify",
"reqwest",
"rusqlite",
"serde",
"serde_json",

View file

@ -57,6 +57,12 @@ qwen3-embeddings = ["embeddings", "fastembed/qwen3", "dep:candle-core"]
# Backwards-compatible feature alias from the original v2.1.0 naming.
qwen3-reranker = ["qwen3-embeddings"]
# External-source connectors (#57). The connector *contract*, normalization,
# and content-hashing are always compiled (pure, no network). This feature adds
# the network-backed reference connectors (GitHub Issues, …) via `reqwest`, so
# the default local-first build never links an HTTP client.
connectors = ["dep:reqwest"]
# Metal GPU acceleration on Apple Silicon (significantly faster inference)
metal = ["fastembed/metal"]
@ -132,6 +138,14 @@ lru = "0.16"
trait-variant = "0.1"
blake3 = "1"
# ============================================================================
# OPTIONAL: External-source connectors (#57)
# ============================================================================
# HTTP client for network-backed reference connectors (GitHub Issues, Redmine).
# rustls so connectors build with no system OpenSSL dependency. Behind the
# `connectors` feature — the default local-first build does not link reqwest.
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"], optional = true }
[dev-dependencies]
tempfile = "3"
criterion = { version = "0.5", features = ["html_reports"] }

View file

@ -0,0 +1,570 @@
//! GitHub Issues connector (#57).
//!
//! Indexes a repository's issues + comments into source-aware Vestige memories
//! so an agent can search and reason over the full issue history **offline**,
//! **semantically**, and **cited back to the canonical issue URL**. Unlike the
//! official GitHub MCP server — a stateless live API proxy — this builds a
//! durable, embedded, temporally-versioned local index.
//!
//! ## Incremental sync (per the connector sync contract)
//!
//! - `state=all` so closing an issue is not mistaken for a deletion.
//! - `sort=updated&direction=asc` so we page forward in cursor order and a
//! mid-run interruption resumes safely.
//! - `since=<cursor overlap>` filters on `updated_at`; the overlap + the
//! `content_hash` no-op makes re-scans safe and cheap.
//! - `Link: rel="next"` drives pagination (never hand-built page urls).
//! - Entries carrying a `pull_request` key are dropped (PRs are not issues).
//! - Per issue we fold the body + comments into one memory; the hash covers
//! the stable fields only (title, body, state, labels, comments) — never the
//! cursor timestamp or volatile counts.
//!
//! GitHub has no deletion feed, so deletions are reconciled out-of-band via
//! [`list_live_ids`](Connector::list_live_ids).
use chrono::{DateTime, Utc};
use serde::Deserialize;
use super::{
Connector, ConnectorError, ConnectorResult, FetchPage, NormalizedRecord, content_hash,
};
use crate::memory::SourceEnvelope;
const API_ROOT: &str = "https://api.github.com";
const USER_AGENT: &str = concat!("vestige-connector/", env!("CARGO_PKG_VERSION"));
const PER_PAGE: u32 = 100;
/// Configuration for a GitHub Issues connector instance.
#[derive(Debug, Clone)]
pub struct GithubConfig {
/// Repository owner (user or org).
pub owner: String,
/// Repository name.
pub repo: String,
/// Personal access token. Optional for public repos (60 req/hr
/// unauthenticated) but strongly recommended (5000 req/hr authenticated).
pub token: Option<String>,
/// Override the API root (for GitHub Enterprise or tests).
pub api_root: Option<String>,
/// Max comments to fold into one issue memory (defense against huge threads).
pub max_comments: usize,
}
impl GithubConfig {
pub fn new(owner: impl Into<String>, repo: impl Into<String>) -> Self {
Self {
owner: owner.into(),
repo: repo.into(),
token: None,
api_root: None,
max_comments: 50,
}
}
pub fn with_token(mut self, token: Option<String>) -> Self {
self.token = token;
self
}
fn scope(&self) -> String {
format!("{}/{}", self.owner, self.repo)
}
fn root(&self) -> &str {
self.api_root.as_deref().unwrap_or(API_ROOT)
}
}
/// A GitHub Issues connector bound to one repository.
pub struct GithubConnector {
config: GithubConfig,
scope: String,
client: reqwest::Client,
}
impl GithubConnector {
pub fn new(config: GithubConfig) -> ConnectorResult<Self> {
if config.owner.is_empty() || config.repo.is_empty() {
return Err(ConnectorError::Config(
"owner and repo are required".to_string(),
));
}
let client = reqwest::Client::builder()
.user_agent(USER_AGENT)
.build()
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
let scope = config.scope();
Ok(Self {
config,
scope,
client,
})
}
fn auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
let req = req
.header("Accept", "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28");
match &self.config.token {
Some(t) => req.bearer_auth(t),
None => req,
}
}
/// Map an HTTP response status into a connector error, honoring rate-limit
/// signals so the driver can back off politely.
fn classify_status(resp: &reqwest::Response) -> Option<ConnectorError> {
let status = resp.status();
if status.is_success() {
return None;
}
// Primary rate limit: 403/429 with remaining=0.
if status.as_u16() == 403 || status.as_u16() == 429 {
let remaining = resp
.headers()
.get("x-ratelimit-remaining")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<i64>().ok());
if remaining == Some(0) || status.as_u16() == 429 {
let retry = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.map(std::time::Duration::from_secs);
return Some(ConnectorError::RateLimited(retry));
}
}
Some(ConnectorError::Source {
status: status.as_u16(),
message: status
.canonical_reason()
.unwrap_or("request failed")
.to_string(),
})
}
/// Parse the `Link` header for the `rel="next"` url, if any.
///
/// The `next` url comes from the server response, so we pin it to the
/// configured API host before following it: otherwise a malicious or
/// compromised endpoint could redirect the connector — which attaches the
/// bearer token to every request — to an attacker-controlled URL and
/// exfiltrate the credential (SSRF / token leak). `expected_host` is the
/// host of the connector's API root.
fn next_link(resp: &reqwest::Response, expected_host: Option<&str>) -> Option<String> {
let link = resp.headers().get(reqwest::header::LINK)?.to_str().ok()?;
for part in link.split(',') {
let part = part.trim();
if part.contains("rel=\"next\"")
&& let (Some(start), Some(end)) = (part.find('<'), part.find('>'))
&& start < end
{
let url = &part[start + 1..end];
// Host-pin: only follow a next-url on the same host as the API
// root we were configured with.
if let Some(expected) = expected_host {
match reqwest::Url::parse(url) {
Ok(parsed) if parsed.host_str() == Some(expected) => {
return Some(url.to_string());
}
_ => {
tracing::warn!(
next_url = url,
"dropping cross-host Link next url (host pin)"
);
return None;
}
}
}
return Some(url.to_string());
}
}
None
}
/// Host of the configured API root, used to pin Link `next` urls.
fn api_host(&self) -> Option<String> {
reqwest::Url::parse(self.config.root())
.ok()
.and_then(|u| u.host_str().map(|h| h.to_string()))
}
/// Fetch the comments for one issue (a single page; capped by `max_comments`).
async fn fetch_comments(&self, issue_number: u64) -> ConnectorResult<Vec<RawComment>> {
let url = format!(
"{}/repos/{}/{}/issues/{}/comments?per_page={}",
self.config.root(),
self.config.owner,
self.config.repo,
issue_number,
self.config.max_comments.min(100),
);
let resp = self
.auth(self.client.get(&url))
.send()
.await
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
if let Some(err) = Self::classify_status(&resp) {
return Err(err);
}
resp.json::<Vec<RawComment>>()
.await
.map_err(|e| ConnectorError::Transport(e.to_string()))
}
/// Fold a raw issue + its comments into one normalized memory record.
fn normalize(&self, issue: &RawIssue, comments: &[RawComment]) -> NormalizedRecord {
let author = issue.user.as_ref().map(|u| u.login.clone());
// Human-readable content: header + body + chronological comments.
let mut content = format!(
"[{}#{}] {}\nState: {}\n",
self.scope, issue.number, issue.title, issue.state
);
if let Some(body) = &issue.body
&& !body.trim().is_empty()
{
content.push('\n');
content.push_str(body.trim());
content.push('\n');
}
let mut sorted_comments: Vec<&RawComment> = comments.iter().collect();
sorted_comments.sort_by_key(|c| c.id);
for c in &sorted_comments {
let who = c.user.as_ref().map(|u| u.login.as_str()).unwrap_or("?");
content.push_str(&format!("\n{who}: {}", c.body.trim()));
}
// Labels, sorted for a stable hash.
let mut labels: Vec<String> = issue.labels.iter().map(|l| l.name.clone()).collect();
labels.sort();
// Stable content hash — meaning only, never the cursor timestamp or
// volatile counts. Comments contribute their id+body in id order.
let comments_blob = sorted_comments
.iter()
.map(|c| format!("{}:{}", c.id, c.body.trim()))
.collect::<Vec<_>>()
.join("\u{1f}");
let labels_blob = labels.join(",");
let number_str = issue.number.to_string();
let body_str = issue.body.clone().unwrap_or_default();
let hash = content_hash(&[
("number", &number_str),
("title", &issue.title),
("state", &issue.state),
("body", &body_str),
("labels", &labels_blob),
("comments", &comments_blob),
]);
let mut tags = vec![
"github".to_string(),
"issue".to_string(),
format!("state:{}", issue.state),
];
tags.extend(labels.into_iter().map(|l| format!("label:{l}")));
let envelope = SourceEnvelope {
source_system: Some("github".to_string()),
source_id: Some(issue.number.to_string()),
source_url: Some(issue.html_url.clone()),
source_updated_at: DateTime::parse_from_rfc3339(&issue.updated_at)
.ok()
.map(|d| d.with_timezone(&Utc)),
content_hash: Some(hash),
synced_at: Some(Utc::now()),
source_project: Some(self.scope.clone()),
source_type: Some("issue".to_string()),
source_author: author,
};
NormalizedRecord {
content,
tags,
envelope,
}
}
}
impl Connector for GithubConnector {
fn source_system(&self) -> &str {
"github"
}
fn scope(&self) -> &str {
&self.scope
}
async fn fetch_updated(
&self,
since: Option<DateTime<Utc>>,
cursor: Option<String>,
) -> ConnectorResult<FetchPage> {
// `cursor` is a full next-page url from a previous Link header; on the
// first page we build the url from owner/repo + since.
let url = match cursor {
Some(u) => u,
None => {
let mut u = format!(
"{}/repos/{}/{}/issues?state=all&sort=updated&direction=asc&per_page={}",
self.config.root(),
self.config.owner,
self.config.repo,
PER_PAGE,
);
if let Some(s) = since {
// GitHub documents the `since` format as YYYY-MM-DDTHH:MM:SSZ.
// `to_rfc3339()` emits the `+00:00` offset form, and the `+`
// is a reserved query char that the server decodes as a
// space — corrupting the timestamp and silently re-fetching
// all history every run. Emit the `Z` form (no reserved
// char, exact documented format) instead.
let since_z = s.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
u.push_str(&format!("&since={since_z}"));
}
u
}
};
let resp = self
.auth(self.client.get(&url))
.send()
.await
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
if let Some(err) = Self::classify_status(&resp) {
return Err(err);
}
let next_cursor = Self::next_link(&resp, self.api_host().as_deref());
let issues: Vec<RawIssue> = resp
.json()
.await
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
let mut records = Vec::new();
for issue in &issues {
// Drop pull requests — "every PR is an issue, but not vice versa".
if issue.pull_request.is_some() {
continue;
}
// Fetch comments only when the issue has any.
let comments = if issue.comments > 0 {
self.fetch_comments(issue.number).await.unwrap_or_default()
} else {
Vec::new()
};
records.push(self.normalize(issue, &comments));
}
Ok(FetchPage {
records,
next_cursor,
})
}
async fn list_live_ids(&self) -> ConnectorResult<Option<Vec<String>>> {
// Enumerate all issue numbers (ids only) for the reconcile pass, paging
// via Link. Cheap relative to full sync (no comment fetch, no bodies).
let mut ids = Vec::new();
let mut url = Some(format!(
"{}/repos/{}/{}/issues?state=all&per_page={}",
self.config.root(),
self.config.owner,
self.config.repo,
PER_PAGE,
));
while let Some(u) = url {
let resp = self
.auth(self.client.get(&u))
.send()
.await
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
if let Some(err) = Self::classify_status(&resp) {
return Err(err);
}
let next = Self::next_link(&resp, self.api_host().as_deref());
let issues: Vec<RawIssue> = resp
.json()
.await
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
for issue in issues {
if issue.pull_request.is_none() {
ids.push(issue.number.to_string());
}
}
url = next;
}
Ok(Some(ids))
}
}
// ---------------------------------------------------------------------------
// Raw GitHub API shapes (only the fields we use)
// ---------------------------------------------------------------------------
#[derive(Debug, Deserialize)]
struct RawIssue {
number: u64,
title: String,
#[serde(default)]
body: Option<String>,
state: String,
html_url: String,
updated_at: String,
#[serde(default)]
comments: u64,
#[serde(default)]
labels: Vec<RawLabel>,
#[serde(default)]
user: Option<RawUser>,
/// Present iff this "issue" is actually a pull request.
#[serde(default)]
pull_request: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct RawLabel {
name: String,
}
#[derive(Debug, Deserialize)]
struct RawUser {
login: String,
}
#[derive(Debug, Deserialize)]
struct RawComment {
id: u64,
body: String,
#[serde(default)]
user: Option<RawUser>,
}
#[cfg(test)]
mod tests {
use super::*;
fn issue(number: u64, title: &str, body: &str, state: &str) -> RawIssue {
RawIssue {
number,
title: title.to_string(),
body: Some(body.to_string()),
state: state.to_string(),
html_url: format!("https://github.com/o/r/issues/{number}"),
updated_at: "2026-06-19T00:00:00Z".to_string(),
comments: 0,
labels: vec![RawLabel {
name: "bug".to_string(),
}],
user: Some(RawUser {
login: "octocat".to_string(),
}),
pull_request: None,
}
}
fn connector() -> GithubConnector {
GithubConnector::new(GithubConfig::new("o", "r")).unwrap()
}
#[test]
fn normalize_builds_keyed_envelope_with_citation() {
let c = connector();
let rec = c.normalize(&issue(57, "Connectors", "Add Redmine", "open"), &[]);
let env = &rec.envelope;
assert!(env.has_key());
assert_eq!(env.source_system.as_deref(), Some("github"));
assert_eq!(env.source_id.as_deref(), Some("57"));
assert_eq!(
env.source_url.as_deref(),
Some("https://github.com/o/r/issues/57")
);
assert_eq!(env.source_project.as_deref(), Some("o/r"));
assert!(rec.content.contains("Connectors"));
assert!(rec.tags.contains(&"state:open".to_string()));
assert!(rec.tags.contains(&"label:bug".to_string()));
}
#[test]
fn hash_stable_across_label_order_and_changes_on_edit() {
let c = connector();
let mut a = issue(1, "T", "body", "open");
a.labels = vec![
RawLabel { name: "b".into() },
RawLabel { name: "a".into() },
];
let mut b = issue(1, "T", "body", "open");
b.labels = vec![
RawLabel { name: "a".into() },
RawLabel { name: "b".into() },
];
let ha = c.normalize(&a, &[]).envelope.content_hash;
let hb = c.normalize(&b, &[]).envelope.content_hash;
assert_eq!(ha, hb, "label order must not change the hash");
// Editing the body must change the hash.
let edited = c.normalize(&issue(1, "T", "EDITED", "open"), &[]).envelope.content_hash;
assert_ne!(ha, edited);
// Closing the issue changes state → changes the hash (not a no-op).
let closed = c.normalize(&issue(1, "T", "body", "closed"), &[]).envelope.content_hash;
assert_ne!(ha, closed);
}
#[test]
fn comments_fold_in_id_order_and_affect_hash() {
let c = connector();
let comments = vec![
RawComment {
id: 2,
body: "second".into(),
user: Some(RawUser { login: "x".into() }),
},
RawComment {
id: 1,
body: "first".into(),
user: Some(RawUser { login: "y".into() }),
},
];
let rec = c.normalize(&issue(1, "T", "body", "open"), &comments);
// Folded in id order regardless of input order.
let first_pos = rec.content.find("first").unwrap();
let second_pos = rec.content.find("second").unwrap();
assert!(first_pos < second_pos, "comments must fold in id order");
let no_comments = c.normalize(&issue(1, "T", "body", "open"), &[]).envelope.content_hash;
assert_ne!(
rec.envelope.content_hash, no_comments,
"comments must contribute to the hash"
);
}
#[test]
fn rejects_empty_owner_repo() {
assert!(GithubConnector::new(GithubConfig::new("", "r")).is_err());
assert!(GithubConnector::new(GithubConfig::new("o", "")).is_err());
}
#[test]
fn since_uses_z_form_not_plus_offset() {
// Regression: to_rfc3339() emits `+00:00`; the `+` decodes to a space
// server-side and corrupts the cursor. We must emit the `Z` form.
let ts = DateTime::parse_from_rfc3339("2026-06-19T00:00:00Z")
.unwrap()
.with_timezone(&Utc);
let z = ts.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
assert_eq!(z, "2026-06-19T00:00:00Z");
assert!(!z.contains('+'), "since must not contain a reserved '+'");
}
#[test]
fn next_link_host_pin_drops_cross_host_url() {
// The host-pin parsing logic (used to prevent token exfiltration via a
// malicious Link header) must reject a different host.
let same = reqwest::Url::parse("https://api.github.com/x?page=2").unwrap();
let other = reqwest::Url::parse("https://evil.example/x?page=2").unwrap();
assert_eq!(same.host_str(), Some("api.github.com"));
assert_ne!(other.host_str(), Some("api.github.com"));
}
}

View file

@ -0,0 +1,372 @@
//! External-source connectors (#57).
//!
//! A connector turns records in a long-lived external system (a ticket tracker,
//! an issue board, a support queue) into source-aware Vestige memories, so an
//! investigative agent can search and reason over years of history **offline**,
//! **semantically**, and **cited back to the canonical record** — something no
//! live ticket-system MCP proxy can do.
//!
//! ## Layering
//!
//! - The [`Connector`] contract, [`NormalizedRecord`] shape, and the stable
//! [`content_hash`] are pure (no network) and always compiled, so the sync
//! semantics are unit-testable without hitting an API.
//! - Network-backed reference connectors (e.g. [`github`]) live behind the
//! `connectors` cargo feature so the default local-first build links no HTTP
//! client.
//!
//! ## Sync contract (the part that makes re-running safe)
//!
//! Every connector produces [`NormalizedRecord`]s. Each carries a
//! [`SourceEnvelope`](crate::memory::SourceEnvelope) whose
//! `(source_system, source_id)` is the idempotency key and whose `content_hash`
//! is the change detector. The driver routes each record through
//! [`upsert_by_source`](crate::storage::SqliteMemoryStore::upsert_by_source):
//!
//! - unseen record → insert
//! - changed `content_hash` → update in place (+ re-embed)
//! - same `content_hash` → no-op (only liveness advances)
//!
//! Because neither GitHub nor Redmine expose a deletion feed, deletions are
//! handled out-of-band by a periodic reconcile pass
//! ([`reconcile_source_tombstones`](crate::storage::SqliteMemoryStore::reconcile_source_tombstones)).
use chrono::{DateTime, Utc};
use crate::memory::{IngestInput, SourceEnvelope};
use crate::storage::ConnectorCursor;
#[cfg(feature = "connectors")]
pub mod github;
/// A single external record, already normalized into the fields Vestige needs.
///
/// The connector is responsible for flattening a possibly-rich source record
/// (an issue plus its comments / journals / status changes) into a single
/// retrievable `content` blob plus the structured envelope. Keeping one memory
/// per logical record (rather than per comment) keeps retrieval coherent and
/// the idempotency key simple.
#[derive(Debug, Clone)]
pub struct NormalizedRecord {
/// Human-readable content to embed and search over.
pub content: String,
/// Tags for categorization (e.g. `["github", "issue", "state:open"]`).
pub tags: Vec<String>,
/// The provenance envelope. `source_system`, `source_id`, and `content_hash`
/// MUST be set for idempotent upsert.
pub envelope: SourceEnvelope,
}
impl NormalizedRecord {
/// Convert into an [`IngestInput`] ready for `upsert_by_source`.
pub fn into_ingest_input(self) -> IngestInput {
IngestInput {
content: self.content,
node_type: "event".to_string(),
source: self.envelope.source_url.clone(),
tags: self.tags,
source_envelope: Some(self.envelope),
..Default::default()
}
}
}
/// One page of records plus the cursor needed to fetch the next page.
#[derive(Debug, Clone, Default)]
pub struct FetchPage {
pub records: Vec<NormalizedRecord>,
/// Opaque token to resume after this page, or `None` when exhausted.
pub next_cursor: Option<String>,
}
/// Errors a connector can surface.
#[derive(Debug, thiserror::Error)]
pub enum ConnectorError {
#[error("configuration error: {0}")]
Config(String),
#[error("transport error: {0}")]
Transport(String),
#[error("rate limited; retry after {0:?}")]
RateLimited(Option<std::time::Duration>),
#[error("source error ({status}): {message}")]
Source { status: u16, message: String },
}
pub type ConnectorResult<T> = Result<T, ConnectorError>;
/// The contract every external-source connector implements.
///
/// Intentionally minimal: fetch a window of records updated since a cursor,
/// page through them, and (separately) enumerate currently-live ids for the
/// deletion-reconcile pass. The driver owns persistence, embedding, and cursor
/// checkpointing — a connector is just a typed, incremental reader.
#[allow(async_fn_in_trait)]
pub trait Connector {
/// Stable system identifier written into every envelope (`github`, …).
fn source_system(&self) -> &str;
/// The scope this connector instance is bound to (`owner/repo`, project id).
fn scope(&self) -> &str;
/// Fetch one page of records whose source-updated time is `>= since`
/// (inclusive on purpose — see the overlap note below), resuming from
/// `cursor` when provided. Records should be returned in ascending
/// update-time order so a mid-run interruption resumes safely.
///
/// Callers pass `since = checkpoint overlap` (a few minutes) so a record
/// written with a slightly-behind upstream clock, or one sharing the exact
/// boundary second, is never skipped. The `content_hash` short-circuit in
/// `upsert_by_source` makes the resulting re-scan free.
async fn fetch_updated(
&self,
since: Option<DateTime<Utc>>,
cursor: Option<String>,
) -> ConnectorResult<FetchPage>;
/// Enumerate the ids currently visible upstream for this scope, for the
/// deletion-reconcile pass. Cheap (ids only). `None` means the connector
/// cannot enumerate, so the driver must skip reconciliation rather than
/// tombstone everything.
async fn list_live_ids(&self) -> ConnectorResult<Option<Vec<String>>> {
Ok(None)
}
}
/// Recommended overlap subtracted from the saved cursor before the next fetch,
/// to absorb clock skew and same-second boundary updates (the `>=` window).
pub const CURSOR_OVERLAP_SECS: i64 = 120;
/// Summary of one sync run, returned to the caller / surfaced by the MCP tool.
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct SyncReport {
pub source_system: String,
pub scope: String,
pub created: usize,
pub updated: usize,
pub unchanged: usize,
pub tombstoned: usize,
/// New high-water mark persisted as the cursor for the next run.
pub new_cursor: Option<DateTime<Utc>>,
/// Whether a deletion-reconcile pass ran this time.
pub reconciled: bool,
/// Non-fatal warnings (e.g. a page that failed and was skipped).
pub warnings: Vec<String>,
}
/// Drive a full incremental sync of one connector into the store (#57).
///
/// This is the orchestration the MCP `source_sync` tool calls. It:
/// 1. loads the saved checkpoint and starts from `cursor overlap` (the `>=`
/// window that prevents missing same-second / clock-skewed updates);
/// 2. pages the connector forward in update order, routing each record through
/// [`upsert_by_source`](crate::storage::SqliteMemoryStore::upsert_by_source)
/// (insert / update-in-place / no-op by content hash);
/// 3. advances the cursor to the max `source_updated_at` actually observed,
/// persisting it only after the run so a crash re-scans rather than skips;
/// 4. optionally reconciles deletions when `reconcile` is set and the connector
/// can enumerate live ids.
///
/// `max_pages` bounds a single run (so a first sync of a 15-year tracker can be
/// resumed across calls rather than blocking on one enormous fetch).
pub async fn run_sync<C: Connector>(
store: &crate::storage::SqliteMemoryStore,
connector: &C,
reconcile: bool,
max_pages: usize,
) -> ConnectorResult<SyncReport> {
use crate::storage::SourceUpsertOutcome;
let source_system = connector.source_system().to_string();
let scope = connector.scope().to_string();
let mut report = SyncReport {
source_system: source_system.clone(),
scope: scope.clone(),
..Default::default()
};
// 1. Load checkpoint, apply the overlap window.
let checkpoint = store
.get_connector_cursor(&source_system, &scope)
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
let since = checkpoint
.cursor_updated_at
.map(|c| c - chrono::Duration::seconds(CURSOR_OVERLAP_SECS));
// 2. Page forward, upserting each record.
let mut cursor: Option<String> = None;
let mut max_seen = checkpoint.cursor_updated_at;
// Oldest source_updated_at among records that FAILED to upsert this run. We
// must not advance the persisted cursor past this, or the failed record —
// fetched in ascending update order — would fall outside the next run's
// `since` window and never be retried (a silent permanent gap).
let mut oldest_failure: Option<DateTime<Utc>> = None;
// Count of genuinely new records (Created). Unchanged re-scans of the
// overlap window must not inflate the running total.
let mut created_this_run = 0i64;
for _ in 0..max_pages.max(1) {
let page = connector.fetch_updated(since, cursor.clone()).await?;
for record in page.records {
let observed = record.envelope.source_updated_at;
match store.upsert_by_source(record.into_ingest_input()) {
Ok(res) => {
match res.outcome {
SourceUpsertOutcome::Created => {
report.created += 1;
created_this_run += 1;
}
SourceUpsertOutcome::Updated => report.updated += 1,
SourceUpsertOutcome::Unchanged => report.unchanged += 1,
}
if let Some(ts) = observed
&& max_seen.map(|m| ts > m).unwrap_or(true)
{
max_seen = Some(ts);
}
}
Err(e) => {
report.warnings.push(format!("upsert failed: {e}"));
if let Some(ts) = observed
&& oldest_failure.map(|f| ts < f).unwrap_or(true)
{
oldest_failure = Some(ts);
}
}
}
}
match page.next_cursor {
Some(next) => cursor = Some(next),
None => break,
}
}
// Clamp the cursor so we never advance past a record that failed this run.
// Subtract one second so the next run's inclusive `since` re-includes it.
if let Some(failed_at) = oldest_failure {
let clamp_to = failed_at - chrono::Duration::seconds(1);
max_seen = Some(match max_seen {
Some(m) if m < clamp_to => m,
_ => clamp_to,
});
}
// 3. Optional deletion reconciliation.
let mut reconciled = false;
if reconcile {
match connector.list_live_ids().await {
Ok(Some(live_ids)) => {
match store.reconcile_source_tombstones(&source_system, &scope, &live_ids) {
Ok(r) => {
report.tombstoned = r.tombstoned.len();
reconciled = true;
}
Err(e) => report.warnings.push(format!("reconcile failed: {e}")),
}
}
Ok(None) => report
.warnings
.push("connector cannot enumerate live ids; skipped reconcile".to_string()),
Err(e) => report.warnings.push(format!("list_live_ids failed: {e}")),
}
}
report.reconciled = reconciled;
report.new_cursor = max_seen;
// 4. Persist the checkpoint (only after the run).
let now = Utc::now();
let new_checkpoint = ConnectorCursor {
source_system: source_system.clone(),
scope: scope.clone(),
cursor_updated_at: max_seen,
last_synced_at: Some(now),
last_full_reconcile_at: if reconciled {
Some(now)
} else {
checkpoint.last_full_reconcile_at
},
// Accumulate only NEW records, so re-scanning the overlap window (which
// reports Unchanged) does not inflate the running total.
records_seen: checkpoint.records_seen + created_this_run,
};
store
.save_connector_cursor(&new_checkpoint)
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
Ok(report)
}
/// Compute a stable content hash over the record's meaning.
///
/// Stability requirements (so re-syncing an unchanged record is a true no-op):
/// - **key order independent** — callers pass `(field, value)` pairs which we
/// sort before hashing, so map/field ordering never changes the digest;
/// - **volatile fields excluded** — the caller must omit the cursor timestamp,
/// view/comment counts, and ephemeral permission flags (hash the meaning,
/// not the metadata);
/// - **collision-resistant** — BLAKE3 (already a Vestige dependency).
///
/// Comment/journal arrays should be flattened into the pairs in a stable order
/// (sorted by their own id) by the caller before hashing.
pub fn content_hash(fields: &[(&str, &str)]) -> String {
let mut pairs: Vec<(&str, &str)> = fields.to_vec();
pairs.sort_by(|a, b| a.0.cmp(b.0).then(a.1.cmp(b.1)));
let mut hasher = blake3::Hasher::new();
for (k, v) in pairs {
// Length-prefix each field so ("ab","c") can't collide with ("a","bc").
hasher.update(&(k.len() as u64).to_le_bytes());
hasher.update(k.as_bytes());
hasher.update(&(v.len() as u64).to_le_bytes());
hasher.update(v.as_bytes());
}
hasher.finalize().to_hex().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn content_hash_is_order_independent() {
let a = content_hash(&[("title", "Crash"), ("body", "stacktrace"), ("state", "open")]);
let b = content_hash(&[("state", "open"), ("title", "Crash"), ("body", "stacktrace")]);
assert_eq!(a, b, "reordering fields must not change the hash");
}
#[test]
fn content_hash_changes_with_content() {
let a = content_hash(&[("body", "v1")]);
let b = content_hash(&[("body", "v2")]);
assert_ne!(a, b, "different content must hash differently");
}
#[test]
fn content_hash_no_boundary_collision() {
// ("ab","c") vs ("a","bc") must differ thanks to length prefixing.
let a = content_hash(&[("ab", "c")]);
let b = content_hash(&[("a", "bc")]);
assert_ne!(a, b);
}
#[test]
fn normalized_record_carries_envelope_into_input() {
let rec = NormalizedRecord {
content: "issue body".to_string(),
tags: vec!["github".to_string()],
envelope: SourceEnvelope {
source_system: Some("github".to_string()),
source_id: Some("42".to_string()),
source_url: Some("https://example/42".to_string()),
content_hash: Some("h".to_string()),
..Default::default()
},
};
let input = rec.into_ingest_input();
assert_eq!(input.content, "issue body");
assert_eq!(input.source.as_deref(), Some("https://example/42"));
let env = input.source_envelope.unwrap();
assert!(env.has_key());
assert_eq!(env.source_id.as_deref(), Some("42"));
}
}

View file

@ -889,6 +889,7 @@ mod tests {
embedding_model: None,
suppression_count: 0,
suppressed_at: None,
source_envelope: None,
}
}

View file

@ -82,6 +82,7 @@
/// Optional `vestige.toml` configuration (Phase 2: Configurable Output).
pub mod config;
pub mod connectors;
pub mod consolidation;
pub mod embedder;
pub mod fsrs;
@ -134,6 +135,7 @@ pub use memory::{
SearchMode,
SearchResult,
SimilarityResult,
SourceEnvelope,
TableIntrospection,
TemporalRange,
};
@ -166,6 +168,7 @@ pub use storage::{
CompositionNeighborRecord,
CompositionOutcomeRecord,
ConnectionRecord,
ConnectorCursor,
ConsolidationHistoryRecord,
Domain,
DreamHistoryRecord,
@ -184,10 +187,13 @@ pub use storage::{
PortableArchive,
PortableImportMode,
PortableImportReport,
ReconcileReport,
Result,
SchedulingState,
SearchQuery,
SmartIngestResult,
SourceUpsertOutcome,
SourceUpsertResult,
SqliteMemoryStore,
StateTransitionRecord,
Storage,

View file

@ -10,7 +10,7 @@ mod node;
mod strength;
mod temporal;
pub use node::{IngestInput, KnowledgeNode, NodeType, RecallInput, SearchMode};
pub use node::{IngestInput, KnowledgeNode, NodeType, RecallInput, SearchMode, SourceEnvelope};
pub use strength::{DualStrength, StrengthDecay};
pub use temporal::{TemporalRange, TemporalValidity};

View file

@ -79,6 +79,91 @@ impl std::fmt::Display for NodeType {
}
}
// ============================================================================
// SOURCE ENVELOPE (#57)
// ============================================================================
/// Structured provenance for a memory that mirrors a record in an external
/// system of record (a Redmine issue, a GitHub Issue, a Jira ticket, a support
/// thread, …).
///
/// The product boundary (#57): the external system stays canonical. Vestige
/// **indexes, connects, retrieves, and cites back**; it does not replace the
/// ticket tracker. This envelope carries exactly the fields a connector needs
/// to do that without leaking stale data:
///
/// - `(source_system, source_id)` is the **idempotency key**. Re-running a sync
/// upserts the same logical record instead of duplicating it.
/// - `content_hash` is the **change detector**. If a re-fetched record hashes
/// to the stored value, the upsert is a no-op (only `synced_at` advances),
/// so an incremental re-scan never churns the index or the embedding model.
/// - `source_url` is the **citation**. Search results link back to the
/// canonical record so the agent can follow it for authoritative detail.
/// - `source_updated_at` is the **cursor field** the connector checkpoints on.
///
/// Every field is optional at the type level so partial connectors and manual
/// imports can populate only what they have, but a real connector should always
/// set `source_system`, `source_id`, and `content_hash`.
#[non_exhaustive]
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SourceEnvelope {
/// External system this record came from, e.g. `redmine`, `github`, `jira`.
/// Namespaces `source_id` so two systems can share numeric ids.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_system: Option<String>,
/// Stable native id in the source system (Redmine issue id, GitHub issue
/// number/node id, …). Combined with `source_system` it is the upsert key.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_id: Option<String>,
/// Canonical URL back to the record so retrieval can cite the source.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_url: Option<String>,
/// When the source record was last updated upstream (the connector cursor
/// field — Redmine `updated_on`, GitHub `updated_at`). RFC 3339.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_updated_at: Option<DateTime<Utc>>,
/// Stable hash of the normalized record content. Idempotency / change key.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub content_hash: Option<String>,
/// When the connector last observed this record live. Drives tombstone
/// reconciliation (a record not seen in a full reconcile pass is gone).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub synced_at: Option<DateTime<Utc>>,
/// Project / repo / space the record belongs to (Redmine project, GitHub
/// `owner/repo`). Used for scoped sync and search filters.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_project: Option<String>,
/// Record type within the source (`issue`, `comment`, `journal`, …).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_type: Option<String>,
/// Author / reporter of the record in the source system.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_author: Option<String>,
}
impl SourceEnvelope {
/// True once the two fields a connector needs for idempotent upsert are
/// present. Manual imports that only set `source_url` are not "keyed".
pub fn has_key(&self) -> bool {
self.source_system.is_some() && self.source_id.is_some()
}
/// True if every field is unset — used to collapse an all-`None` envelope
/// back to `None` on the node so legacy rows stay clean.
pub fn is_empty(&self) -> bool {
self.source_system.is_none()
&& self.source_id.is_none()
&& self.source_url.is_none()
&& self.source_updated_at.is_none()
&& self.content_hash.is_none()
&& self.synced_at.is_none()
&& self.source_project.is_none()
&& self.source_type.is_none()
&& self.source_author.is_none()
}
}
// ============================================================================
// KNOWLEDGE NODE
// ============================================================================
@ -188,6 +273,15 @@ pub struct KnowledgeNode {
/// Timestamp of the most recent suppression (for 24h labile window).
#[serde(skip_serializing_if = "Option::is_none")]
pub suppressed_at: Option<DateTime<Utc>>,
// ========== Source Envelope (#57, external-source connectors) ==========
/// Structured provenance for memories ingested from an external system
/// (Redmine, GitHub Issues, Jira, …). `None` for memories created directly
/// by an agent or the user — the legacy free-form `source` string above
/// remains the human-readable label; this envelope is the machine-readable,
/// idempotency- and citation-bearing record. See [`SourceEnvelope`].
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_envelope: Option<SourceEnvelope>,
}
impl Default for KnowledgeNode {
@ -224,6 +318,7 @@ impl Default for KnowledgeNode {
embedding_model: None,
suppression_count: 0,
suppressed_at: None,
source_envelope: None,
}
}
}
@ -291,6 +386,11 @@ pub struct IngestInput {
/// When this knowledge stops being valid
#[serde(skip_serializing_if = "Option::is_none")]
pub valid_until: Option<DateTime<Utc>>,
/// Structured provenance for connector-ingested records (#57). When set
/// with a `(source_system, source_id)` key, callers should route through
/// `upsert_by_source` for idempotent sync rather than plain `ingest`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source_envelope: Option<SourceEnvelope>,
}
impl Default for IngestInput {
@ -304,6 +404,7 @@ impl Default for IngestInput {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
}
}
}

View file

@ -84,6 +84,11 @@ pub const MIGRATIONS: &[Migration] = &[
description: "ADR 0001 Phase 1: embedding_model registry, domains/domain_scores columns, domains table",
up: MIGRATION_V16_UP,
},
Migration {
version: 17,
description: "#57 Source envelope: provenance columns + connector cursor checkpoints for idempotent external-source sync",
up: MIGRATION_V17_UP,
},
];
/// A database migration
@ -957,6 +962,73 @@ pub const MIGRATION_V16_ALTER_COLUMNS: &[&str] = &[
"ALTER TABLE knowledge_nodes ADD COLUMN domain_scores TEXT NOT NULL DEFAULT '{}'",
];
/// V17: #57 Source envelope — structured provenance for connector-ingested
/// records, plus a per-connector cursor checkpoint table.
///
/// The provenance columns live directly on `knowledge_nodes` (rather than a
/// side table) so search can filter and cite them with no join. They are all
/// nullable and default-NULL, so every existing memory is untouched and the
/// migration is purely additive — legacy rows simply have no envelope.
///
/// The `(source_system, source_id)` pair is the idempotency key for
/// `upsert_by_source`; the unique index enforces one memory per external
/// record. `content_hash` is the change detector. `connector_cursors` holds the
/// incremental-sync high-water mark and last full-reconcile time per
/// (source_system, scope).
///
/// The `ALTER TABLE ... ADD COLUMN` statements are split into
/// `MIGRATION_V17_ALTER_COLUMNS` and run individually by the migration runner,
/// because SQLite has no `ADD COLUMN IF NOT EXISTS`; duplicate-column errors are
/// swallowed so replay stays idempotent.
const MIGRATION_V17_UP: &str = r#"
-- Idempotency key: at most one memory per (source_system, source_id).
-- Partial unique index so the millions of envelope-less legacy rows (all NULL)
-- don't collide and don't pay index cost.
CREATE UNIQUE INDEX IF NOT EXISTS idx_nodes_source_key
ON knowledge_nodes(source_system, source_id)
WHERE source_system IS NOT NULL AND source_id IS NOT NULL;
-- Filter/scan support for source-aware search + reconciliation passes.
CREATE INDEX IF NOT EXISTS idx_nodes_source_system
ON knowledge_nodes(source_system)
WHERE source_system IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_nodes_source_project
ON knowledge_nodes(source_project)
WHERE source_project IS NOT NULL;
-- Per-connector incremental-sync checkpoint. One row per (source_system, scope)
-- e.g. ('github', 'samvallad33/vestige'). `cursor_updated_at` is the
-- high-water mark on the source's update timestamp; `last_full_reconcile_at`
-- gates the (expensive) deletion-reconcile pass.
CREATE TABLE IF NOT EXISTS connector_cursors (
source_system TEXT NOT NULL,
scope TEXT NOT NULL,
cursor_updated_at TEXT,
last_synced_at TEXT,
last_full_reconcile_at TEXT,
records_seen INTEGER NOT NULL DEFAULT 0,
config TEXT NOT NULL DEFAULT '{}',
PRIMARY KEY (source_system, scope)
);
UPDATE schema_version SET version = 17, applied_at = datetime('now');
"#;
/// The `ALTER TABLE` statements for V17. Run individually + idempotently by the
/// migration runner (SQLite has no `ADD COLUMN IF NOT EXISTS`).
pub const MIGRATION_V17_ALTER_COLUMNS: &[&str] = &[
"ALTER TABLE knowledge_nodes ADD COLUMN source_system TEXT",
"ALTER TABLE knowledge_nodes ADD COLUMN source_id TEXT",
"ALTER TABLE knowledge_nodes ADD COLUMN source_url TEXT",
"ALTER TABLE knowledge_nodes ADD COLUMN source_updated_at TEXT",
"ALTER TABLE knowledge_nodes ADD COLUMN content_hash TEXT",
"ALTER TABLE knowledge_nodes ADD COLUMN synced_at TEXT",
"ALTER TABLE knowledge_nodes ADD COLUMN source_project TEXT",
"ALTER TABLE knowledge_nodes ADD COLUMN source_type TEXT",
"ALTER TABLE knowledge_nodes ADD COLUMN source_author TEXT",
];
/// Apply pending migrations
pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result<u32> {
let current_version = get_current_version(conn)?;
@ -994,6 +1066,15 @@ pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result<u32> {
}
}
// V17 (#57) adds the source-envelope columns. Same idempotent
// ALTER handling as V16 — the unique index in the V17 batch
// references these columns, so they must exist before the batch.
if migration.version == 17 {
for stmt in MIGRATION_V17_ALTER_COLUMNS {
add_column_if_missing(conn, stmt)?;
}
}
// Use execute_batch to handle multi-statement SQL including triggers
conn.execute_batch(migration.up)?;
@ -1026,11 +1107,11 @@ mod tests {
// Pre-requisite: schema_version must be bootstrapped by V1.
apply_migrations(&conn).expect("apply_migrations succeeds");
// 1. schema_version advanced to V16
// 1. schema_version advanced to the latest migration
let version = get_current_version(&conn).expect("read schema_version");
assert_eq!(
version, 16,
"schema_version must be 16 after all migrations"
version, 17,
"schema_version must be 17 after all migrations"
);
// 2. knowledge_edges is gone (V11 drops it)
@ -1151,11 +1232,11 @@ mod tests {
// Replay V11 onward. V11 uses DROP TABLE IF EXISTS so it is idempotent.
// V12/V13 tombstone tables use CREATE TABLE IF NOT EXISTS. V14/V16 ALTER
// TABLE idempotency is handled by the migration runner.
apply_migrations(&conn).expect("V11..V16 replay must be idempotent");
apply_migrations(&conn).expect("V11..V17 replay must be idempotent");
// After replaying from V10, the schema advances to the latest version.
let version = get_current_version(&conn).expect("read schema_version");
assert_eq!(version, 16, "schema_version back at 16 after replay");
assert_eq!(version, 17, "schema_version back at latest after replay");
}
#[test]
@ -1229,7 +1310,97 @@ mod tests {
// V16 uses CREATE TABLE IF NOT EXISTS and idempotent ALTER handling.
apply_migrations(&conn).expect("V16 replay must be idempotent");
let version = get_current_version(&conn).expect("read version");
assert_eq!(version, 16, "schema_version must be 16 after replay");
assert_eq!(version, 17, "schema_version must be latest after replay");
}
#[test]
fn v17_adds_source_envelope_columns_and_cursor_table() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
apply_migrations(&conn).expect("apply_migrations");
// All nine envelope columns must exist on knowledge_nodes.
let cols: Vec<String> = {
let mut stmt = conn
.prepare("PRAGMA table_info(knowledge_nodes)")
.expect("prepare");
stmt.query_map([], |row| row.get::<_, String>(1))
.expect("query_map")
.filter_map(|r| r.ok())
.collect()
};
for c in [
"source_system",
"source_id",
"source_url",
"source_updated_at",
"content_hash",
"synced_at",
"source_project",
"source_type",
"source_author",
] {
assert!(
cols.iter().any(|x| x == c),
"knowledge_nodes must have `{c}` column after V17"
);
}
// connector_cursors table must exist.
let cursor_rows: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='connector_cursors'",
[],
|row| row.get(0),
)
.expect("query sqlite_master");
assert_eq!(cursor_rows, 1, "connector_cursors must be created by V17");
}
#[test]
fn v17_unique_source_key_index_allows_many_null_legacy_rows() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
apply_migrations(&conn).expect("apply_migrations");
// Two legacy rows with NULL source key must NOT collide on the partial
// unique index (the index only covers non-NULL keys).
for id in ["a", "b"] {
conn.execute(
"INSERT INTO knowledge_nodes (id, content, node_type, created_at, updated_at, last_accessed, \
stability, difficulty, reps, lapses, learning_state, storage_strength, retrieval_strength, \
retention_strength, next_review, scheduled_days, has_embedding) \
VALUES (?1,'c','fact',datetime('now'),datetime('now'),datetime('now'),\
1.0,0.3,0,0,'new',1.0,1.0,1.0,datetime('now'),1,0)",
[id],
)
.expect("insert legacy row");
}
// Two real connector rows that share (source_system, source_id) MUST
// collide — the unique index is the idempotency guarantee.
conn.execute(
"UPDATE knowledge_nodes SET source_system='github', source_id='1' WHERE id='a'",
[],
)
.expect("set source key on a");
let dup = conn.execute(
"UPDATE knowledge_nodes SET source_system='github', source_id='1' WHERE id='b'",
[],
);
assert!(
dup.is_err(),
"duplicate (source_system, source_id) must violate the unique index"
);
}
#[test]
fn v17_is_replayable() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
apply_migrations(&conn).expect("first apply");
conn.execute("UPDATE schema_version SET version = 16", [])
.expect("rewind to 16");
apply_migrations(&conn).expect("V17 replay must be idempotent");
let version = get_current_version(&conn).expect("read version");
assert_eq!(version, 17, "schema_version must be 17 after replay");
}
#[test]

View file

@ -19,9 +19,10 @@ pub use portable::{
};
pub use sqlite::{
CompositionEventRecord, CompositionMemberRecord, CompositionNeighborRecord,
CompositionOutcomeRecord, ConnectionRecord, ConsolidationHistoryRecord, DreamHistoryRecord,
FilePortableSyncBackend, InsightRecord, IntentionRecord, NeverComposedCandidate,
PortableSyncBackend, PortableSyncReport, Result, SmartIngestResult, SqliteMemoryStore,
CompositionOutcomeRecord, ConnectionRecord, ConnectorCursor, ConsolidationHistoryRecord,
DreamHistoryRecord, FilePortableSyncBackend, InsightRecord, IntentionRecord,
NeverComposedCandidate, PortableSyncBackend, PortableSyncReport, ReconcileReport, Result,
SmartIngestResult, SourceUpsertOutcome, SourceUpsertResult, SqliteMemoryStore,
StateTransitionRecord, StorageError,
};

View file

@ -682,6 +682,12 @@ impl SqliteMemoryStore {
let valid_from_str = input.valid_from.map(|dt| dt.to_rfc3339());
let valid_until_str = input.valid_until.map(|dt| dt.to_rfc3339());
// #57 Source envelope — flatten to nullable column values. A node with
// no external provenance leaves all nine columns NULL (legacy shape).
let env = input.source_envelope.clone().unwrap_or_default();
let env_source_updated_at = env.source_updated_at.map(|dt| dt.to_rfc3339());
let env_synced_at = env.synced_at.map(|dt| dt.to_rfc3339());
{
let writer = self
.writer
@ -694,14 +700,18 @@ impl SqliteMemoryStore {
storage_strength, retrieval_strength, retention_strength,
sentiment_score, sentiment_magnitude, next_review, scheduled_days,
source, tags, valid_from, valid_until, has_embedding, embedding_model,
domains, domain_scores
domains, domain_scores,
source_system, source_id, source_url, source_updated_at,
content_hash, synced_at, source_project, source_type, source_author
) VALUES (
?1, ?2, ?3, ?4, ?5, ?6,
?7, ?8, ?9, ?10, ?11,
?12, ?13, ?14,
?15, ?16, ?17, ?18,
?19, ?20, ?21, ?22, ?23, ?24,
'[]', '{}'
'[]', '{}',
?25, ?26, ?27, ?28,
?29, ?30, ?31, ?32, ?33
)",
params![
id,
@ -728,6 +738,15 @@ impl SqliteMemoryStore {
valid_until_str,
0,
Option::<String>::None,
env.source_system,
env.source_id,
env.source_url,
env_source_updated_at,
env.content_hash,
env_synced_at,
env.source_project,
env.source_type,
env.source_author,
],
)?;
}
@ -1257,6 +1276,33 @@ impl SqliteMemoryStore {
.ok()
});
// #57 Source envelope columns (Migration V17). `.ok().flatten()` is
// tolerant of pre-V17 databases that lack these columns. Collapse an
// all-NULL envelope to `None` so legacy nodes serialize unchanged.
let parse_ts = |s: Option<String>| -> Option<DateTime<Utc>> {
s.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.ok()
})
};
let envelope = crate::memory::SourceEnvelope {
source_system: row.get("source_system").ok().flatten(),
source_id: row.get("source_id").ok().flatten(),
source_url: row.get("source_url").ok().flatten(),
source_updated_at: parse_ts(row.get("source_updated_at").ok().flatten()),
content_hash: row.get("content_hash").ok().flatten(),
synced_at: parse_ts(row.get("synced_at").ok().flatten()),
source_project: row.get("source_project").ok().flatten(),
source_type: row.get("source_type").ok().flatten(),
source_author: row.get("source_author").ok().flatten(),
};
let source_envelope = if envelope.is_empty() {
None
} else {
Some(envelope)
};
Ok(KnowledgeNode {
id: row.get("id")?,
content: row.get("content")?,
@ -1293,6 +1339,8 @@ impl SqliteMemoryStore {
// v2.0.5 Active Forgetting
suppression_count,
suppressed_at,
// #57 Source envelope
source_envelope,
})
}
@ -9415,6 +9463,336 @@ impl crate::storage::memory_store::MemoryStoreSend for SqliteMemoryStore {
}
}
// ============================================================================
// CONNECTOR SYNC (#57) — idempotent external-source ingestion
// ============================================================================
/// What `upsert_by_source` did with one external record. Drives the
/// created/updated/unchanged/tombstoned counts a connector reports.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceUpsertOutcome {
/// No memory existed for this `(source_system, source_id)` — inserted.
Created,
/// A memory existed and the `content_hash` changed — body + envelope updated
/// and the embedding regenerated.
Updated,
/// A memory existed with the same `content_hash` — nothing rewritten except
/// `synced_at` (so an incremental re-scan is free).
Unchanged,
}
/// Result of one `upsert_by_source` call.
#[derive(Debug, Clone)]
pub struct SourceUpsertResult {
pub outcome: SourceUpsertOutcome,
/// Memory id of the affected node (new or existing).
pub node_id: String,
}
/// Incremental-sync checkpoint for one `(source_system, scope)`.
#[derive(Debug, Clone, Default)]
pub struct ConnectorCursor {
pub source_system: String,
pub scope: String,
/// High-water mark on the source's update timestamp. `None` on first sync.
pub cursor_updated_at: Option<DateTime<Utc>>,
pub last_synced_at: Option<DateTime<Utc>>,
pub last_full_reconcile_at: Option<DateTime<Utc>>,
pub records_seen: i64,
}
/// Outcome of a tombstone reconciliation pass.
#[derive(Debug, Clone, Default)]
pub struct ReconcileReport {
/// Memory ids that were tombstoned (no longer visible upstream).
pub tombstoned: Vec<String>,
/// Number of local records considered for this scope.
pub considered: usize,
}
impl SqliteMemoryStore {
/// Idempotently upsert one external-source record, keyed on the envelope's
/// `(source_system, source_id)` (#57).
///
/// This is the core primitive every connector calls per record. It makes
/// re-running a sync safe and cheap:
///
/// - **No existing memory** for the key → insert (`Created`).
/// - **Existing memory, `content_hash` changed** → update content + envelope,
/// stamp `updated_at`, regenerate the embedding (`Updated`).
/// - **Existing memory, `content_hash` unchanged** → touch only `synced_at`
/// so the reconcile pass knows the record is still live (`Unchanged`).
///
/// The caller MUST set `source_system`, `source_id`, and `content_hash` on
/// the input's `source_envelope`; otherwise this falls back to a plain
/// `ingest` (an un-keyed record can't be deduplicated).
pub fn upsert_by_source(&self, input: IngestInput) -> Result<SourceUpsertResult> {
let env = match input.source_envelope.clone() {
Some(e) if e.has_key() => e,
// No idempotency key — behave like a normal create.
_ => {
let node = self.ingest(input)?;
return Ok(SourceUpsertResult {
outcome: SourceUpsertOutcome::Created,
node_id: node.id,
});
}
};
let source_system = env.source_system.clone().unwrap_or_default();
let source_id = env.source_id.clone().unwrap_or_default();
let now = Utc::now();
// Look up the existing memory for this external record, if any.
let existing: Option<(String, Option<String>)> = {
let reader = self
.reader
.lock()
.map_err(|_| StorageError::Init("Reader lock poisoned".into()))?;
reader
.query_row(
"SELECT id, content_hash FROM knowledge_nodes \
WHERE source_system = ?1 AND source_id = ?2 LIMIT 1",
params![source_system, source_id],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?)),
)
.optional()?
};
let Some((node_id, stored_hash)) = existing else {
// First time we've seen this record — plain insert carries the
// envelope through the existing ingest path.
let node = self.ingest(input)?;
return Ok(SourceUpsertResult {
outcome: SourceUpsertOutcome::Created,
node_id: node.id,
});
};
let new_hash = env.content_hash.clone();
let unchanged = match (&stored_hash, &new_hash) {
// Both present and equal → genuinely unchanged.
(Some(a), Some(b)) => a == b,
// Either side missing a hash → be conservative and treat as changed
// so we never silently skip a real update.
_ => false,
};
let env_source_updated_at = env.source_updated_at.map(|dt| dt.to_rfc3339());
let synced_at = now.to_rfc3339();
if unchanged {
// Cheapest path: only advance liveness + the source cursor field.
let writer = self
.writer
.lock()
.map_err(|_| StorageError::Init("Writer lock poisoned".into()))?;
writer.execute(
// Un-tombstone fully: a reappearing record clears BOTH bitemporal
// markers (valid_until AND superseded_by), otherwise it would be
// resurrected as currently-valid yet still flagged as superseded,
// which permanently excludes it from merge/consolidation.
"UPDATE knowledge_nodes \
SET synced_at = ?1, source_updated_at = COALESCE(?2, source_updated_at), \
source_url = COALESCE(?3, source_url), \
valid_until = NULL, superseded_by = NULL \
WHERE id = ?4",
params![synced_at, env_source_updated_at, env.source_url, node_id],
)?;
return Ok(SourceUpsertResult {
outcome: SourceUpsertOutcome::Unchanged,
node_id,
});
}
// Content changed upstream → update body + full envelope, clear any
// prior tombstone (`valid_until`), then regenerate the embedding.
{
let writer = self
.writer
.lock()
.map_err(|_| StorageError::Init("Writer lock poisoned".into()))?;
writer.execute(
// Clear BOTH bitemporal markers on update (see Unchanged branch).
"UPDATE knowledge_nodes SET \
content = ?1, updated_at = ?2, synced_at = ?3, \
content_hash = ?4, source_url = ?5, source_updated_at = ?6, \
source_project = ?7, source_type = ?8, source_author = ?9, \
valid_until = NULL, superseded_by = NULL \
WHERE id = ?10",
params![
input.content,
now.to_rfc3339(),
synced_at,
env.content_hash,
env.source_url,
env_source_updated_at,
env.source_project,
env.source_type,
env.source_author,
node_id,
],
)?;
}
#[cfg(all(feature = "embeddings", feature = "vector-search"))]
{
if let Some(index) = self.vector_index.as_ref()
&& let Ok(mut index) = index.lock()
{
let _ = index.remove(&node_id);
}
if let Err(e) = self.generate_embedding_for_node(&node_id, &input.content) {
tracing::warn!("Failed to regenerate embedding for {}: {}", node_id, e);
}
}
Ok(SourceUpsertResult {
outcome: SourceUpsertOutcome::Updated,
node_id,
})
}
/// Read the incremental-sync checkpoint for a `(source_system, scope)`.
/// Returns a zeroed cursor (no high-water mark) if none has been saved yet.
pub fn get_connector_cursor(&self, source_system: &str, scope: &str) -> Result<ConnectorCursor> {
let reader = self
.reader
.lock()
.map_err(|_| StorageError::Init("Reader lock poisoned".into()))?;
let row = reader
.query_row(
"SELECT cursor_updated_at, last_synced_at, last_full_reconcile_at, records_seen \
FROM connector_cursors WHERE source_system = ?1 AND scope = ?2",
params![source_system, scope],
|row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, i64>(3)?,
))
},
)
.optional()?;
let parse = |s: Option<String>| -> Option<DateTime<Utc>> {
s.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.ok()
})
};
Ok(match row {
Some((cur, last, recon, seen)) => ConnectorCursor {
source_system: source_system.to_string(),
scope: scope.to_string(),
cursor_updated_at: parse(cur),
last_synced_at: parse(last),
last_full_reconcile_at: parse(recon),
records_seen: seen,
},
None => ConnectorCursor {
source_system: source_system.to_string(),
scope: scope.to_string(),
..Default::default()
},
})
}
/// Persist the incremental-sync checkpoint for a `(source_system, scope)`.
pub fn save_connector_cursor(&self, cursor: &ConnectorCursor) -> Result<()> {
let writer = self
.writer
.lock()
.map_err(|_| StorageError::Init("Writer lock poisoned".into()))?;
writer.execute(
"INSERT INTO connector_cursors \
(source_system, scope, cursor_updated_at, last_synced_at, \
last_full_reconcile_at, records_seen) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6) \
ON CONFLICT(source_system, scope) DO UPDATE SET \
cursor_updated_at = excluded.cursor_updated_at, \
last_synced_at = excluded.last_synced_at, \
last_full_reconcile_at = excluded.last_full_reconcile_at, \
records_seen = excluded.records_seen",
params![
cursor.source_system,
cursor.scope,
cursor.cursor_updated_at.map(|d| d.to_rfc3339()),
cursor.last_synced_at.map(|d| d.to_rfc3339()),
cursor.last_full_reconcile_at.map(|d| d.to_rfc3339()),
cursor.records_seen,
],
)?;
Ok(())
}
/// Reconcile deletions for a scope: tombstone every local memory in
/// `(source_system, source_project = scope)` whose `source_id` is NOT in the
/// caller-supplied set of currently-live ids (#57).
///
/// Neither Redmine nor GitHub exposes a deletion feed, so an incremental
/// `updated_at` sync can never see a delete. The connector therefore
/// periodically enumerates the full set of live ids and calls this. We
/// **invalidate, don't purge** (Graphiti-style): the memory keeps its
/// content for audit but gets `valid_until = now`, so it falls out of
/// "currently valid" retrieval without losing history. A record that
/// reappears upstream is un-tombstoned by the next `upsert_by_source`
/// (which clears `valid_until`).
pub fn reconcile_source_tombstones(
&self,
source_system: &str,
scope: &str,
live_ids: &[String],
) -> Result<ReconcileReport> {
let live: std::collections::HashSet<&str> = live_ids.iter().map(|s| s.as_str()).collect();
// All currently-valid local records for this scope.
let local: Vec<(String, String)> = {
let reader = self
.reader
.lock()
.map_err(|_| StorageError::Init("Reader lock poisoned".into()))?;
let mut stmt = reader.prepare(
"SELECT id, source_id FROM knowledge_nodes \
WHERE source_system = ?1 AND source_project = ?2 \
AND source_id IS NOT NULL AND valid_until IS NULL",
)?;
let rows = stmt.query_map(params![source_system, scope], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
rows.filter_map(|r| r.ok()).collect()
};
let considered = local.len();
let now = Utc::now().to_rfc3339();
let mut tombstoned = Vec::new();
{
let writer = self
.writer
.lock()
.map_err(|_| StorageError::Init("Writer lock poisoned".into()))?;
for (node_id, source_id) in &local {
if !live.contains(source_id.as_str()) {
writer.execute(
"UPDATE knowledge_nodes SET valid_until = ?1 WHERE id = ?2",
params![now, node_id],
)?;
tombstoned.push(node_id.clone());
}
}
}
Ok(ReconcileReport {
tombstoned,
considered,
})
}
}
// ============================================================================
// TESTS
// ============================================================================
@ -9443,6 +9821,234 @@ mod tests {
Storage::new(Some(dir.path().join(name))).unwrap()
}
// ===================== Connector sync (#57) =========================
/// Build an `IngestInput` carrying a source envelope for a GitHub-ish issue.
fn source_input(id: &str, content: &str, hash: &str) -> IngestInput {
IngestInput {
content: content.to_string(),
node_type: "fact".to_string(),
source_envelope: Some(crate::memory::SourceEnvelope {
source_system: Some("github".to_string()),
source_id: Some(id.to_string()),
source_url: Some(format!("https://github.com/o/r/issues/{id}")),
content_hash: Some(hash.to_string()),
source_project: Some("o/r".to_string()),
source_type: Some("issue".to_string()),
source_author: Some("octocat".to_string()),
..Default::default()
}),
..Default::default()
}
}
fn node_count(store: &Storage) -> i64 {
// Count rows for our test source so embeddings/other tests don't bleed in.
let reader = store.reader.lock().unwrap();
reader
.query_row(
"SELECT COUNT(*) FROM knowledge_nodes WHERE source_system = 'github'",
[],
|r| r.get(0),
)
.unwrap()
}
#[test]
fn upsert_by_source_is_idempotent_across_reruns() {
let store = create_test_storage();
// First sync: a brand-new record → Created.
let r1 = store
.upsert_by_source(source_input("1", "Bug: crash on startup", "hash-a"))
.unwrap();
assert_eq!(r1.outcome, SourceUpsertOutcome::Created);
assert_eq!(node_count(&store), 1);
// Re-sync the SAME record with the SAME hash twice → Unchanged, no dupes.
for _ in 0..2 {
let r = store
.upsert_by_source(source_input("1", "Bug: crash on startup", "hash-a"))
.unwrap();
assert_eq!(r.outcome, SourceUpsertOutcome::Unchanged);
assert_eq!(r.node_id, r1.node_id, "must reuse the same memory id");
}
assert_eq!(node_count(&store), 1, "idempotent: still exactly one memory");
}
#[test]
fn upsert_by_source_updates_in_place_when_hash_changes() {
let store = create_test_storage();
let created = store
.upsert_by_source(source_input("7", "old body", "hash-old"))
.unwrap();
// Upstream edit: content + hash change → Updated, same id, new content.
let updated = store
.upsert_by_source(source_input("7", "new edited body", "hash-new"))
.unwrap();
assert_eq!(updated.outcome, SourceUpsertOutcome::Updated);
assert_eq!(updated.node_id, created.node_id);
assert_eq!(node_count(&store), 1, "update must not duplicate");
let node = store.get_node(&created.node_id).unwrap().unwrap();
assert_eq!(node.content, "new edited body");
let env = node.source_envelope.expect("envelope persisted");
assert_eq!(env.content_hash.as_deref(), Some("hash-new"));
assert_eq!(env.source_id.as_deref(), Some("7"));
}
#[test]
fn upsert_by_source_without_key_falls_back_to_create() {
let store = create_test_storage();
// Envelope present but missing source_id → not keyed → plain create.
let input = IngestInput {
content: "loose note".to_string(),
node_type: "fact".to_string(),
source_envelope: Some(crate::memory::SourceEnvelope {
source_url: Some("https://example.com/x".to_string()),
..Default::default()
}),
..Default::default()
};
let r = store.upsert_by_source(input).unwrap();
assert_eq!(r.outcome, SourceUpsertOutcome::Created);
}
#[test]
fn connector_cursor_round_trips() {
let store = create_test_storage();
// Unknown scope → zeroed cursor.
let empty = store.get_connector_cursor("github", "o/r").unwrap();
assert!(empty.cursor_updated_at.is_none());
assert_eq!(empty.records_seen, 0);
let ts = Utc::now();
let cursor = ConnectorCursor {
source_system: "github".to_string(),
scope: "o/r".to_string(),
cursor_updated_at: Some(ts),
last_synced_at: Some(ts),
last_full_reconcile_at: None,
records_seen: 42,
};
store.save_connector_cursor(&cursor).unwrap();
let back = store.get_connector_cursor("github", "o/r").unwrap();
assert_eq!(back.records_seen, 42);
assert_eq!(
back.cursor_updated_at.map(|d| d.to_rfc3339()),
Some(ts.to_rfc3339())
);
// Upsert semantics: saving again replaces, never duplicates.
let mut c2 = cursor.clone();
c2.records_seen = 99;
store.save_connector_cursor(&c2).unwrap();
assert_eq!(store.get_connector_cursor("github", "o/r").unwrap().records_seen, 99);
}
#[test]
fn reconcile_tombstones_records_absent_from_live_set() {
let store = create_test_storage();
// Three synced issues in scope o/r.
for id in ["1", "2", "3"] {
store
.upsert_by_source(source_input(id, &format!("issue {id}"), &format!("h{id}")))
.unwrap();
}
// Reconcile: only 1 and 3 are still visible upstream → 2 is tombstoned.
let report = store
.reconcile_source_tombstones("github", "o/r", &["1".to_string(), "3".to_string()])
.unwrap();
assert_eq!(report.considered, 3);
assert_eq!(report.tombstoned.len(), 1, "exactly issue 2 tombstoned");
// Issue 2's memory is invalidated (valid_until set) but NOT purged —
// content retained for audit, just no longer currently-valid.
let two = {
let reader = store.reader.lock().unwrap();
reader
.query_row(
"SELECT id, valid_until FROM knowledge_nodes WHERE source_id = '2'",
[],
|r| Ok((r.get::<_, String>(0)?, r.get::<_, Option<String>>(1)?)),
)
.unwrap()
};
assert!(two.1.is_some(), "tombstoned record must have valid_until set");
let node = store.get_node(&two.0).unwrap().unwrap();
assert!(!node.is_currently_valid(), "tombstoned node is not valid now");
assert_eq!(node.content, "issue 2", "content retained for audit");
// A reappearing record un-tombstones on next upsert (clears valid_until).
store
.upsert_by_source(source_input("2", "issue 2", "h2"))
.unwrap();
let revived = store.get_node(&two.0).unwrap().unwrap();
assert!(revived.is_currently_valid(), "re-synced record is valid again");
}
#[test]
fn upsert_clears_superseded_by_when_record_reappears() {
// Regression: un-tombstoning must clear BOTH bitemporal markers. A
// connector node that was superseded/merged (valid_until + superseded_by
// both set) and then re-observed upstream must come back fully clean,
// otherwise it is currently-valid yet still flagged superseded and is
// permanently excluded from merge candidacy.
let store = create_test_storage();
let created = store
.upsert_by_source(source_input("9", "body v1", "h9a"))
.unwrap();
// Simulate the node having been superseded (as merge/supersede would).
{
let writer = store.writer.lock().unwrap();
writer
.execute(
"UPDATE knowledge_nodes SET valid_until = ?1, superseded_by = 'survivor-id' WHERE id = ?2",
params![Utc::now().to_rfc3339(), created.node_id],
)
.unwrap();
}
assert!(
store.superseded_node_ids().unwrap().contains(&created.node_id),
"precondition: node is superseded"
);
// Re-sync with a content change → Updated branch must clear both markers.
let res = store
.upsert_by_source(source_input("9", "body v2 edited", "h9b"))
.unwrap();
assert_eq!(res.outcome, SourceUpsertOutcome::Updated);
assert!(
!store.superseded_node_ids().unwrap().contains(&created.node_id),
"superseded_by must be cleared on re-sync (no bitemporal zombie)"
);
let node = store.get_node(&created.node_id).unwrap().unwrap();
assert!(node.is_currently_valid());
// Also exercise the Unchanged branch: supersede again, re-sync same hash.
{
let writer = store.writer.lock().unwrap();
writer
.execute(
"UPDATE knowledge_nodes SET valid_until = ?1, superseded_by = 'survivor-id' WHERE id = ?2",
params![Utc::now().to_rfc3339(), created.node_id],
)
.unwrap();
}
let res2 = store
.upsert_by_source(source_input("9", "body v2 edited", "h9b"))
.unwrap();
assert_eq!(res2.outcome, SourceUpsertOutcome::Unchanged);
assert!(
!store.superseded_node_ids().unwrap().contains(&created.node_id),
"Unchanged branch must also clear superseded_by"
);
}
#[cfg(all(feature = "embeddings", feature = "vector-search"))]
fn with_vector_search_disabled<T>(f: impl FnOnce() -> T) -> T {
let _guard = ENV_LOCK.lock().unwrap();

View file

@ -10,9 +10,13 @@ categories = ["command-line-utilities", "database"]
repository = "https://github.com/samvallad33/vestige"
[features]
default = ["embeddings", "ort-download", "vector-search"]
default = ["embeddings", "ort-download", "vector-search", "connectors"]
embeddings = ["vestige-core/embeddings"]
vector-search = ["vestige-core/vector-search"]
# External-source connectors (#57): GitHub Issues / Redmine indexing via the
# `source_sync` MCP tool. On by default so the tool works out of the box; turn
# off for a build with no HTTP client.
connectors = ["vestige-core/connectors"]
# Default ort backend: downloads prebuilt ONNX Runtime at build time.
# Fails on targets without prebuilts (notably x86_64-apple-darwin).
ort-download = ["embeddings", "vestige-core/ort-download"]

View file

@ -1817,6 +1817,7 @@ fn run_restore(backup_path: PathBuf) -> anyhow::Result<()> {
tags: memory.tags.unwrap_or_default(),
valid_from: None,
valid_until: None,
source_envelope: None,
};
match storage.ingest(input) {
@ -2415,6 +2416,7 @@ fn run_ingest(
tags: tag_list,
valid_from: None,
valid_until: None,
source_envelope: None,
};
let storage = open_storage()?;

View file

@ -73,6 +73,7 @@ fn main() -> anyhow::Result<()> {
tags: memory.tags.unwrap_or_default(),
valid_from: None,
valid_until: None,
source_envelope: None,
};
match storage.ingest(input) {

View file

@ -195,6 +195,7 @@ mod tests {
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
result.id

View file

@ -281,6 +281,15 @@ impl McpServer {
..Default::default()
},
// ================================================================
// EXTERNAL-SOURCE CONNECTORS (#57)
// ================================================================
ToolDescription {
name: "source_sync".to_string(),
description: Some("Index an external system (GitHub Issues) into Vestige as a durable, offline, semantically-searchable index that cites back to the canonical record. Provide 'repo' as 'owner/name'. Idempotent: re-running updates changed issues without duplicating; set reconcile=true to tombstone issues removed upstream. Auth via the GITHUB_TOKEN env var (optional for public repos).".to_string()),
input_schema: tools::source_sync::schema(),
..Default::default()
},
// ================================================================
// TEMPORAL TOOLS (v1.2+)
// ================================================================
ToolDescription {
@ -593,6 +602,11 @@ impl McpServer {
.await
}
// ================================================================
// External-source connectors (#57)
// ================================================================
"source_sync" => tools::source_sync::execute(&self.storage, request.arguments).await,
// ================================================================
// DEPRECATED (v1.7): ingest → smart_ingest
// ================================================================
@ -1806,10 +1820,10 @@ mod tests {
let result = response.result.unwrap();
let tools = result["tools"].as_array().unwrap();
// 33 tools: 25 from v2.1.21 + 7 Phase 3 merge/supersede tools:
// merge_candidates, plan_merge, plan_supersede, apply_plan, merge_undo,
// protect, merge_policy, composed_graph)
assert_eq!(tools.len(), 33, "Expected exactly 33 tools");
// 34 tools: 25 from v2.1.21 + 7 Phase 3 merge/supersede tools
// (merge_candidates, plan_merge, plan_supersede, apply_plan, merge_undo,
// protect, merge_policy, composed_graph) + 1 connector tool (source_sync, #57).
assert_eq!(tools.len(), 34, "Expected exactly 34 tools");
let tool_names: Vec<&str> = tools.iter().map(|t| t["name"].as_str().unwrap()).collect();
@ -1821,6 +1835,9 @@ mod tests {
// Core memory (smart_ingest absorbs ingest + checkpoint in v1.7)
assert!(tool_names.contains(&"smart_ingest"));
// External-source connectors (#57)
assert!(tool_names.contains(&"source_sync"));
assert!(
!tool_names.contains(&"ingest"),
"ingest should be removed in v1.7"

View file

@ -278,6 +278,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
node.id

View file

@ -154,6 +154,7 @@ async fn execute_remember_pattern(
tags,
valid_from: None,
valid_until: None,
source_envelope: None,
};
let node = storage.ingest(input).map_err(|e| e.to_string())?;
@ -250,6 +251,7 @@ async fn execute_remember_decision(
tags,
valid_from: None,
valid_until: None,
source_envelope: None,
};
let node = storage.ingest(input).map_err(|e| e.to_string())?;

View file

@ -1119,6 +1119,7 @@ mod tests {
tags: tags.iter().map(|s| s.to_string()).collect(),
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap()
.id

View file

@ -283,6 +283,7 @@ mod tests {
tags: vec!["dream-test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}
@ -420,6 +421,7 @@ mod tests {
tags: vec!["dream-roundtrip".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}
@ -485,6 +487,7 @@ mod tests {
tags: vec!["save-conn-test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
ids.push(result.id);
@ -588,6 +591,7 @@ mod tests {
tags: tags.iter().map(|t| t.to_string()).collect(),
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}
@ -713,6 +717,7 @@ mod tests {
tags: tags.iter().map(|t| t.to_string()).collect(),
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}

View file

@ -414,6 +414,7 @@ mod tests {
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap()
.id;
@ -428,6 +429,7 @@ mod tests {
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap()
.id;
@ -478,6 +480,7 @@ mod tests {
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
};
let id_a = storage.ingest(make("Memory A about databases")).unwrap().id;
let id_b = storage.ingest(make("Memory B about indexes")).unwrap().id;
@ -529,6 +532,7 @@ mod tests {
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
};
let id_a = storage.ingest(make("Bridge test memory A")).unwrap().id;
let id_b = storage.ingest(make("Bridge test memory B")).unwrap().id;

View file

@ -313,6 +313,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
node.id
@ -556,6 +557,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
let node_id = node.id.clone();

View file

@ -328,6 +328,7 @@ mod tests {
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
@ -355,6 +356,7 @@ mod tests {
tags: vec!["science".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
@ -378,6 +380,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();

View file

@ -119,6 +119,7 @@ mod tests {
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}
@ -144,6 +145,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();

View file

@ -778,6 +778,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}
@ -832,6 +833,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}
@ -904,6 +906,7 @@ mod tests {
tags: vec!["schema-test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
@ -1015,6 +1018,7 @@ mod tests {
tags: vec!["portable".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();

View file

@ -552,6 +552,7 @@ mod tests {
tags: vec!["test-tag".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
node.id

View file

@ -12,6 +12,8 @@ pub mod intention_unified;
pub mod memory_unified;
pub mod search_unified;
pub mod smart_ingest;
// #57: external-source connectors (GitHub Issues / Redmine retrieval layer)
pub mod source_sync;
// v1.2: Temporal query tools
pub mod changelog;

View file

@ -173,6 +173,7 @@ pub async fn execute(storage: &Arc<Storage>, args: Option<Value>) -> Result<Valu
tags: memory.tags.clone().unwrap_or_default(),
valid_from: None,
valid_until: None,
source_envelope: None,
};
match storage.ingest(input) {
@ -349,6 +350,7 @@ mod tests {
tags: vec!["portable".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();

View file

@ -117,6 +117,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
};
let node = storage.ingest(input).unwrap();
node.id

View file

@ -889,6 +889,31 @@ pub fn apply_output_masks(results: &mut [Value], output_config: &OutputConfig) {
}
}
/// Build a compact `source` object from a node's connector provenance (#57),
/// or `Value::Null` when the memory has no external source envelope.
///
/// Surfacing `url` in search results is the whole point of the connector layer:
/// the agent can follow the citation back to the canonical Redmine/GitHub record
/// for authoritative detail. `tombstoned` flags records no longer visible
/// upstream (kept for audit).
fn source_provenance(node: &vestige_core::KnowledgeNode) -> Value {
let Some(env) = node.source_envelope.as_ref() else {
return Value::Null;
};
serde_json::json!({
"system": env.source_system,
"id": env.source_id,
"url": env.source_url,
"project": env.source_project,
"type": env.source_type,
"author": env.source_author,
"sourceUpdatedAt": env.source_updated_at.map(|dt| dt.to_rfc3339()),
"syncedAt": env.synced_at.map(|dt| dt.to_rfc3339()),
// A tombstoned (no-longer-visible) record has valid_until set in the past.
"tombstoned": !node.is_currently_valid(),
})
}
fn format_search_result(r: &vestige_core::SearchResult, detail_level: &str) -> Value {
match detail_level {
"brief" => serde_json::json!({
@ -898,45 +923,65 @@ fn format_search_result(r: &vestige_core::SearchResult, detail_level: &str) -> V
"retentionStrength": r.node.retention_strength,
"combinedScore": r.combined_score,
}),
"full" => serde_json::json!({
"id": r.node.id,
"content": r.node.content,
"combinedScore": r.combined_score,
"keywordScore": r.keyword_score,
"semanticScore": r.semantic_score,
"nodeType": r.node.node_type,
"tags": r.node.tags,
"retentionStrength": r.node.retention_strength,
"storageStrength": r.node.storage_strength,
"retrievalStrength": r.node.retrieval_strength,
"source": r.node.source,
"sentimentScore": r.node.sentiment_score,
"sentimentMagnitude": r.node.sentiment_magnitude,
"createdAt": r.node.created_at.to_rfc3339(),
"updatedAt": r.node.updated_at.to_rfc3339(),
"lastAccessed": r.node.last_accessed.to_rfc3339(),
"nextReview": r.node.next_review.map(|dt| dt.to_rfc3339()),
"stability": r.node.stability,
"difficulty": r.node.difficulty,
"reps": r.node.reps,
"lapses": r.node.lapses,
"validFrom": r.node.valid_from.map(|dt| dt.to_rfc3339()),
"validUntil": r.node.valid_until.map(|dt| dt.to_rfc3339()),
"matchType": format!("{:?}", r.match_type),
}),
"full" => {
let mut v = serde_json::json!({
"id": r.node.id,
"content": r.node.content,
"combinedScore": r.combined_score,
"keywordScore": r.keyword_score,
"semanticScore": r.semantic_score,
"nodeType": r.node.node_type,
"tags": r.node.tags,
"retentionStrength": r.node.retention_strength,
"storageStrength": r.node.storage_strength,
"retrievalStrength": r.node.retrieval_strength,
"source": r.node.source,
"sentimentScore": r.node.sentiment_score,
"sentimentMagnitude": r.node.sentiment_magnitude,
"createdAt": r.node.created_at.to_rfc3339(),
"updatedAt": r.node.updated_at.to_rfc3339(),
"lastAccessed": r.node.last_accessed.to_rfc3339(),
"nextReview": r.node.next_review.map(|dt| dt.to_rfc3339()),
"stability": r.node.stability,
"difficulty": r.node.difficulty,
"reps": r.node.reps,
"lapses": r.node.lapses,
"validFrom": r.node.valid_from.map(|dt| dt.to_rfc3339()),
"validUntil": r.node.valid_until.map(|dt| dt.to_rfc3339()),
"matchType": format!("{:?}", r.match_type),
});
attach_source_record(&mut v, &r.node);
v
}
// "summary" (default) — includes dates so AI never has to guess when a memory is from
_ => serde_json::json!({
"id": r.node.id,
"content": r.node.content,
"combinedScore": r.combined_score,
"keywordScore": r.keyword_score,
"semanticScore": r.semantic_score,
"nodeType": r.node.node_type,
"tags": r.node.tags,
"retentionStrength": r.node.retention_strength,
"createdAt": r.node.created_at.to_rfc3339(),
"updatedAt": r.node.updated_at.to_rfc3339(),
}),
_ => {
let mut v = serde_json::json!({
"id": r.node.id,
"content": r.node.content,
"combinedScore": r.combined_score,
"keywordScore": r.keyword_score,
"semanticScore": r.semantic_score,
"nodeType": r.node.node_type,
"tags": r.node.tags,
"retentionStrength": r.node.retention_strength,
"createdAt": r.node.created_at.to_rfc3339(),
"updatedAt": r.node.updated_at.to_rfc3339(),
});
attach_source_record(&mut v, &r.node);
v
}
}
}
/// Inject a `sourceRecord` object into a result `Value` ONLY when the memory
/// has external provenance, so legacy (agent/user-authored) memories keep their
/// exact prior result shape.
fn attach_source_record(value: &mut Value, node: &vestige_core::KnowledgeNode) {
let provenance = source_provenance(node);
if !provenance.is_null()
&& let Value::Object(map) = value
{
map.insert("sourceRecord".to_string(), provenance);
}
}
@ -950,36 +995,44 @@ pub fn format_node(node: &vestige_core::KnowledgeNode, detail_level: &str) -> Va
"tags": node.tags,
"retentionStrength": node.retention_strength,
}),
"full" => serde_json::json!({
"id": node.id,
"content": node.content,
"nodeType": node.node_type,
"tags": node.tags,
"retentionStrength": node.retention_strength,
"storageStrength": node.storage_strength,
"retrievalStrength": node.retrieval_strength,
"source": node.source,
"sentimentScore": node.sentiment_score,
"sentimentMagnitude": node.sentiment_magnitude,
"createdAt": node.created_at.to_rfc3339(),
"updatedAt": node.updated_at.to_rfc3339(),
"lastAccessed": node.last_accessed.to_rfc3339(),
"nextReview": node.next_review.map(|dt| dt.to_rfc3339()),
"stability": node.stability,
"difficulty": node.difficulty,
"reps": node.reps,
"lapses": node.lapses,
"validFrom": node.valid_from.map(|dt| dt.to_rfc3339()),
"validUntil": node.valid_until.map(|dt| dt.to_rfc3339()),
}),
"full" => {
let mut v = serde_json::json!({
"id": node.id,
"content": node.content,
"nodeType": node.node_type,
"tags": node.tags,
"retentionStrength": node.retention_strength,
"storageStrength": node.storage_strength,
"retrievalStrength": node.retrieval_strength,
"source": node.source,
"sentimentScore": node.sentiment_score,
"sentimentMagnitude": node.sentiment_magnitude,
"createdAt": node.created_at.to_rfc3339(),
"updatedAt": node.updated_at.to_rfc3339(),
"lastAccessed": node.last_accessed.to_rfc3339(),
"nextReview": node.next_review.map(|dt| dt.to_rfc3339()),
"stability": node.stability,
"difficulty": node.difficulty,
"reps": node.reps,
"lapses": node.lapses,
"validFrom": node.valid_from.map(|dt| dt.to_rfc3339()),
"validUntil": node.valid_until.map(|dt| dt.to_rfc3339()),
});
attach_source_record(&mut v, node);
v
}
// "summary" (default)
_ => serde_json::json!({
"id": node.id,
"content": node.content,
"nodeType": node.node_type,
"tags": node.tags,
"retentionStrength": node.retention_strength,
}),
_ => {
let mut v = serde_json::json!({
"id": node.id,
"content": node.content,
"nodeType": node.node_type,
"tags": node.tags,
"retentionStrength": node.retention_strength,
});
attach_source_record(&mut v, node);
v
}
}
}
@ -1016,6 +1069,7 @@ mod tests {
tags: vec![],
valid_from: None,
valid_until: None,
source_envelope: None,
};
let node = storage.ingest(input).unwrap();
node.id
@ -1839,6 +1893,7 @@ mod tests {
tags: tags.into_iter().map(String::from).collect(),
valid_from: None,
valid_until: None,
source_envelope: None,
};
let node = storage.ingest(input).unwrap();
node.id

View file

@ -506,6 +506,7 @@ mod tests {
tags: tags.into_iter().map(|s| s.to_string()).collect(),
valid_from: None,
valid_until: None,
source_envelope: None,
};
let node = storage.ingest(input).unwrap();
node.id
@ -712,6 +713,7 @@ mod tests {
tags: vec!["pattern".to_string(), "codebase:vestige".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
};
storage.ingest(input).unwrap();

View file

@ -215,6 +215,7 @@ pub async fn execute(
tags,
valid_from: None,
valid_until: None,
source_envelope: None,
};
// ====================================================================
@ -414,6 +415,7 @@ async fn execute_batch(
tags,
valid_from: None,
valid_until: None,
source_envelope: None,
};
// ================================================================

View file

@ -0,0 +1,187 @@
//! `source_sync` MCP tool (#57) — index an external system into Vestige.
//!
//! Turns Vestige into a durable, offline, provenance-linked retrieval layer
//! over a long-lived external system. The first connector is GitHub Issues:
//! point it at `owner/repo` and Vestige indexes every issue + its comments as
//! source-aware memories you can search semantically and cite back to the
//! canonical issue URL — re-runnable idempotently (no duplicates) and able to
//! tombstone issues that vanish upstream.
//!
//! Unlike the official GitHub MCP server (a stateless live API proxy), this
//! keeps a local index: searchable offline, embedded for semantic recall,
//! joinable with the rest of your memory, and temporally versioned.
//!
//! ## Auth (security)
//!
//! The GitHub token is read from the `GITHUB_TOKEN` (or `VESTIGE_GITHUB_TOKEN`)
//! environment variable, never from tool arguments, so credentials are not
//! logged in the conversation. Public repositories work without a token at a
//! lower rate limit.
use std::sync::Arc;
use serde::Deserialize;
use serde_json::{Value, json};
use vestige_core::storage::Storage;
/// JSON schema for the `source_sync` tool.
pub fn schema() -> Value {
json!({
"type": "object",
"properties": {
"source": {
"type": "string",
"enum": ["github"],
"description": "External system to sync. Currently: 'github' (GitHub Issues).",
"default": "github"
},
"repo": {
"type": "string",
"description": "GitHub repository as 'owner/name', e.g. 'samvallad33/vestige'."
},
"reconcile": {
"type": "boolean",
"description": "Also tombstone local memories for issues no longer visible upstream (an extra full enumeration pass). Default false on incremental syncs.",
"default": false
},
"max_pages": {
"type": "integer",
"description": "Max API pages to fetch this run (each page is up to 100 issues). Lets a first sync of a large repo be resumed across calls. Default 10.",
"default": 10,
"minimum": 1,
"maximum": 1000
}
},
"required": ["repo"]
})
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct SourceSyncArgs {
#[serde(default = "default_source")]
source: String,
repo: String,
#[serde(default)]
reconcile: bool,
#[serde(default, alias = "max_pages")]
max_pages: Option<usize>,
}
fn default_source() -> String {
"github".to_string()
}
/// Read the GitHub token from the environment (never from tool args).
fn github_token() -> Option<String> {
std::env::var("GITHUB_TOKEN")
.or_else(|_| std::env::var("VESTIGE_GITHUB_TOKEN"))
.ok()
.filter(|s| !s.trim().is_empty())
}
pub async fn execute(storage: &Arc<Storage>, args: Option<Value>) -> Result<Value, String> {
let args: SourceSyncArgs = match args {
Some(v) => serde_json::from_value(v).map_err(|e| format!("Invalid arguments: {e}"))?,
None => return Err("Missing arguments".to_string()),
};
if args.source != "github" {
return Err(format!(
"Unsupported source '{}'. Currently only 'github' is supported.",
args.source
));
}
let (owner, repo) = args
.repo
.split_once('/')
.filter(|(o, r)| !o.is_empty() && !r.is_empty())
.ok_or_else(|| {
"repo must be in 'owner/name' form, e.g. 'samvallad33/vestige'".to_string()
})?;
execute_github(
storage,
owner,
repo,
args.reconcile,
args.max_pages.unwrap_or(10),
)
.await
}
/// Connectors are feature-gated; surface a clear message when the build omits
/// them rather than failing obscurely.
#[cfg(not(feature = "connectors"))]
async fn execute_github(
_storage: &Arc<Storage>,
_owner: &str,
_repo: &str,
_reconcile: bool,
_max_pages: usize,
) -> Result<Value, String> {
Err("This Vestige build was compiled without the 'connectors' feature. \
Rebuild with --features connectors to enable source_sync."
.to_string())
}
#[cfg(feature = "connectors")]
async fn execute_github(
storage: &Arc<Storage>,
owner: &str,
repo: &str,
reconcile: bool,
max_pages: usize,
) -> Result<Value, String> {
use vestige_core::connectors::github::{GithubConfig, GithubConnector};
use vestige_core::connectors::run_sync;
let config = GithubConfig::new(owner, repo).with_token(github_token());
let connector =
GithubConnector::new(config).map_err(|e| format!("connector init failed: {e}"))?;
let report = run_sync(storage.as_ref(), &connector, reconcile, max_pages)
.await
.map_err(|e| format!("sync failed: {e}"))?;
let scope = format!("{owner}/{repo}");
let total = report.created + report.updated + report.unchanged;
let authed = github_token().is_some();
let summary = format!(
"Synced {scope}: {} created, {} updated, {} unchanged{} ({total} records seen{}).",
report.created,
report.updated,
report.unchanged,
if report.reconciled {
format!(", {} tombstoned", report.tombstoned)
} else {
String::new()
},
if authed { "" } else { ", unauthenticated" },
);
Ok(json!({
"ok": true,
"summary": summary,
"source": "github",
"scope": scope,
"created": report.created,
"updated": report.updated,
"unchanged": report.unchanged,
"tombstoned": report.tombstoned,
"reconciled": report.reconciled,
"cursor": report.new_cursor.map(|d| d.to_rfc3339()),
"authenticated": authed,
"warnings": report.warnings,
"hint": if total == 0 && !authed {
"No records returned. For private repos or higher rate limits, set GITHUB_TOKEN in the server environment."
} else if report.new_cursor.is_some() && total >= 100 {
"More may remain — run source_sync again to continue from the saved cursor."
} else {
"Search these with the normal search tools; results cite the GitHub issue URL."
}
}))
}

View file

@ -177,6 +177,7 @@ mod tests {
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap()
.id

View file

@ -209,6 +209,7 @@ mod tests {
tags: vec!["timeline-test".to_string()],
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}
@ -226,6 +227,7 @@ mod tests {
tags: tags.iter().map(|t| t.to_string()).collect(),
valid_from: None,
valid_until: None,
source_envelope: None,
})
.unwrap();
}

150
docs/CONNECTORS.md Normal file
View file

@ -0,0 +1,150 @@
# External-Source Connectors
> Status: **v2.1.27** — GitHub Issues connector (reference). Redmine and others
> follow the same contract. Tracking issue:
> [#57](https://github.com/samvallad33/vestige/issues/57).
Connectors let Vestige act as a durable, local **retrieval and reasoning layer**
over a long-lived external system — a ticket tracker, an issue board, a support
queue — **without replacing it**. The external system stays the source of truth.
Vestige indexes its records, embeds them for semantic recall, links them into the
memory graph, and **cites back** to the canonical record.
## Why this is different from a ticket-system MCP
The official GitHub / Jira MCP servers are **live API proxies**: every query hits
the upstream API, is rate-limited, keyword-only, online-only, and has no memory
of past state. Vestige instead keeps a **durable local index** of the records, so
you can:
- search the history **offline** and **semantically** (embeddings, not just
keywords),
- **join** ticket history with the rest of your memory in one search,
- see a **point-in-time** view (records carry temporal validity),
- and re-sync **idempotently** — re-running never duplicates a record.
## Quick start (GitHub Issues)
1. (Optional but recommended) export a token so you get the authenticated rate
limit (5,000 req/hr vs 60 for anonymous) and access to private repos:
```sh
export GITHUB_TOKEN=ghp_xxx # or VESTIGE_GITHUB_TOKEN
```
The token is read **only** from the environment — never passed as a tool
argument, never logged.
2. Ask your agent to run the `source_sync` MCP tool:
```json
{ "repo": "samvallad33/vestige" }
```
3. Search as normal. Connector-sourced results carry a `sourceRecord` object with
the canonical issue URL:
```json
{
"content": "[samvallad33/vestige#57] Roadmap: external source connectors …",
"sourceRecord": {
"system": "github",
"id": "57",
"url": "https://github.com/samvallad33/vestige/issues/57",
"project": "samvallad33/vestige",
"type": "issue",
"author": "samvallad33",
"tombstoned": false
}
}
```
## The `source_sync` tool
| Field | Type | Default | Meaning |
|---|---|---|---|
| `repo` | string | — (required) | `owner/name`, e.g. `samvallad33/vestige`. |
| `source` | string | `github` | External system. Currently only `github`. |
| `reconcile` | bool | `false` | Also tombstone local memories for issues no longer visible upstream (an extra full-enumeration pass). |
| `max_pages` | int | `10` | API pages to fetch this run (≤100 issues each). Lets a first sync of a large repo resume across calls. |
The tool returns counts (`created` / `updated` / `unchanged` / `tombstoned`),
the saved `cursor`, whether it ran authenticated, and a `hint` for the next step.
### Idempotent, incremental sync
Each run:
1. resumes from the saved cursor (the high-water mark on the record's upstream
update time), minus a small overlap window so same-second / clock-skewed
updates are never missed;
2. pages issues in ascending update order (`state=all`, so closing an issue is
**not** mistaken for a deletion), folding each issue + its comments into one
memory;
3. routes each record through an **idempotent upsert** keyed on
`(source_system, source_id)`:
- unseen record → **insert**,
- changed content (by content hash) → **update in place** + re-embed,
- unchanged content → **no-op** (only the "last seen" time advances);
4. advances and persists the cursor only after the run, so an interruption
re-scans rather than skips.
Re-running `source_sync` on the same repo is therefore safe and cheap — it picks
up only what changed.
### Deletions (tombstoning)
Neither GitHub nor Redmine exposes a deletion feed, so an incremental sync can
never *see* a delete. Pass `reconcile: true` to run a reconciliation pass: Vestige
enumerates the currently-visible issue ids and **invalidates** (does not purge)
any local record no longer present. A tombstoned record keeps its content for
audit but drops out of "currently valid" retrieval (`sourceRecord.tombstoned` is
`true`). If the record reappears upstream, the next sync un-tombstones it.
## The source envelope
Every connector-ingested memory carries structured provenance, distinct from the
legacy free-form `source` label:
| Field | Purpose |
|---|---|
| `source_system` | `github`, `redmine`, … (namespaces ids). |
| `source_id` | Native id (issue number, ticket id). |
| `source_url` | Canonical link back — the citation. |
| `source_updated_at` | Upstream update time (the sync cursor field). |
| `content_hash` | Change detector → idempotency. |
| `synced_at` | When the connector last saw the record live. |
| `source_project` | Repo / project / space. |
| `source_type` | `issue`, `comment`, … |
| `source_author` | Reporter / author upstream. |
`(source_system, source_id)` is enforced unique, so there is exactly one memory
per external record. Legacy memories (agent- or user-authored) have no envelope
and are completely unaffected.
## Building
The connector HTTP client is behind the `connectors` cargo feature, which is
**on by default in the MCP server** (`vestige-mcp`). A build without it still
exposes the `source_sync` tool but returns a clear "rebuild with `--features
connectors`" message. The core library (`vestige-core`) leaves the feature
**off** by default, so library consumers that don't need connectors link no HTTP
client.
```sh
# default MCP build already includes connectors
cargo build -p vestige-mcp --release
# explicit, or for the core lib
cargo build -p vestige-core --features connectors
```
## Writing a new connector
Implement the `Connector` trait in `vestige_core::connectors` (fetch a window of
records updated since a cursor, page forward, and optionally enumerate live ids
for reconciliation), produce `NormalizedRecord`s with a filled
`SourceEnvelope`, and hand them to `run_sync`. The GitHub connector
(`crates/vestige-core/src/connectors/github.rs`) is the reference
implementation. The sync driver, idempotent upsert, cursor checkpointing, and
tombstone reconciliation are all reused for free.

View file

@ -31,6 +31,7 @@ fn make_ingest_input(
source,
valid_from,
valid_until,
source_envelope: None,
}
}

View file

@ -29,6 +29,7 @@ fn make_ingest_input(
source,
valid_from,
valid_until,
source_envelope: None,
}
}