feat(cloud-sync): HTTP managed-sync backend + vestige sync --cloud

Vestige Cloud MVP client side. Implements HttpPortableSyncBackend, an HTTP
impl of the existing PortableSyncBackend trait, reusing the production
sync_portable_archive pull-merge-push engine unchanged — only the transport
is new. Per-user isolation via opaque bearer sync key (namespace derived
server-side). Optimistic concurrency via ETag/If-Match to prevent lost
updates across devices; 412 surfaces a re-run-to-merge message.

- new cloud-sync cargo feature (vestige-core + vestige-mcp), gates reqwest
  blocking; default local-first build stays network-free
- sync_portable_archive_cloud wrapper mirrors sync_portable_archive_file
- CLI: vestige sync --cloud [--endpoint], VESTIGE_CLOUD_ENDPOINT/SYNC_KEY env
- 8 unit tests (dependency-free TcpListener mock): 404/200/401 reads,
  If-Match present/absent writes, 412 conflict, ETag capture

485 core tests green, clippy -D warnings clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sam Valladares 2026-06-19 20:35:01 -05:00
parent 5c2db045f6
commit fdd6b98180
7 changed files with 450 additions and 8 deletions

View file

@ -63,6 +63,14 @@ qwen3-reranker = ["qwen3-embeddings"]
# the default local-first build never links an HTTP client.
connectors = ["dep:reqwest"]
# Hosted managed-sync backend (Vestige Cloud). Adds the HTTP `PortableSyncBackend`
# (`HttpPortableSyncBackend`) that pull-merge-pushes the portable archive to a
# hosted blob endpoint over HTTPS with a per-user sync key. Like `connectors`,
# this is the ONLY thing that links an HTTP client — the default local-first
# build stays network-free. Uses `reqwest`'s blocking client because the
# `PortableSyncBackend` trait methods are synchronous.
cloud-sync = ["dep:reqwest", "reqwest/blocking"]
# Metal GPU acceleration on Apple Silicon (significantly faster inference)
metal = ["fastembed/metal"]

View file

@ -187,6 +187,7 @@ pub use storage::{
PortableArchive,
PortableImportMode,
PortableImportReport,
PortableSyncReport,
ReconcileReport,
Result,
SchedulingState,

View file

@ -0,0 +1,337 @@
//! Hosted managed-sync backend (Vestige Cloud).
//!
//! This module is only compiled with the `cloud-sync` feature. It provides
//! [`HttpPortableSyncBackend`], an HTTP implementation of the
//! [`PortableSyncBackend`](super::sqlite::PortableSyncBackend) trait that
//! pull-merge-pushes the portable archive to a hosted blob endpoint.
//!
//! The merge/conflict engine is unchanged: this backend only moves bytes. The
//! authoritative `key -> namespace` mapping and per-user isolation live in the
//! hosted service; the client just presents an opaque sync key as a bearer
//! token. The default local-first build never links an HTTP client.
//!
//! ## Concurrency
//!
//! Two devices can each pull → merge → push. To avoid a lost update in the
//! GET↔PUT window, the backend uses optimistic concurrency: it captures the
//! object `ETag` on read and sends it as `If-Match` on write. The generic
//! [`sync_portable_archive`](super::sqlite::SqliteMemoryStore::sync_portable_archive)
//! driver calls `read_archive` then `write_archive` exactly once, so the ETag
//! captured during the pull is the precondition for the push. A
//! `412 Precondition Failed` means another device wrote in between; the caller
//! re-runs sync (the merge is idempotent and converges by `updated_at`).
use std::cell::RefCell;
use std::time::Duration;
use reqwest::blocking::Client;
use reqwest::header::{AUTHORIZATION, ETAG, IF_MATCH};
use reqwest::StatusCode;
use super::portable::PortableArchive;
use super::sqlite::{PortableSyncBackend, Result, StorageError};
/// Default request timeout for cloud sync HTTP calls.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
/// Blob path on the hosted service. One opaque blob per sync key (the service
/// derives the namespace from the key), so the client uses a fixed path.
const BLOB_PATH: &str = "/v1/blob";
/// HTTP-backed portable sync backend for Vestige Cloud.
///
/// Mirrors the shape of
/// [`FilePortableSyncBackend`](super::sqlite::FilePortableSyncBackend) but reads
/// and writes the archive over HTTPS with a per-user bearer key.
#[derive(Debug)]
pub struct HttpPortableSyncBackend {
/// Base endpoint, e.g. `https://sync.vestige.dev`. No trailing slash.
endpoint: String,
/// Per-user sync key, presented as `Authorization: Bearer <key>`.
sync_key: String,
/// Blocking HTTP client (the trait is synchronous).
client: Client,
/// ETag captured on the most recent successful read, used as the `If-Match`
/// precondition on the next write. `None` until the first read, or when the
/// remote had no archive yet.
last_etag: RefCell<Option<String>>,
}
impl HttpPortableSyncBackend {
/// Build a cloud sync backend for `endpoint` authenticated with `sync_key`.
///
/// A trailing slash on `endpoint` is trimmed so URL joining is predictable.
pub fn new(endpoint: impl Into<String>, sync_key: impl Into<String>) -> Result<Self> {
let endpoint = endpoint.into().trim_end_matches('/').to_string();
let sync_key = sync_key.into();
if endpoint.is_empty() {
return Err(StorageError::Init(
"cloud sync endpoint is empty (set VESTIGE_CLOUD_ENDPOINT)".to_string(),
));
}
if sync_key.is_empty() {
return Err(StorageError::Init(
"cloud sync key is empty (set VESTIGE_CLOUD_SYNC_KEY)".to_string(),
));
}
let client = Client::builder()
.timeout(REQUEST_TIMEOUT)
.user_agent(concat!("vestige-cloud-sync/", env!("CARGO_PKG_VERSION")))
.build()
.map_err(|e| StorageError::Init(format!("failed to build HTTP client: {e}")))?;
Ok(Self {
endpoint,
sync_key,
client,
last_etag: RefCell::new(None),
})
}
/// Full blob URL for this backend.
fn blob_url(&self) -> String {
format!("{}{}", self.endpoint, BLOB_PATH)
}
}
impl PortableSyncBackend for HttpPortableSyncBackend {
fn label(&self) -> String {
format!("cloud:{}", self.endpoint)
}
fn read_archive(&self) -> Result<Option<PortableArchive>> {
let resp = self
.client
.get(self.blob_url())
.header(AUTHORIZATION, format!("Bearer {}", self.sync_key))
.send()
.map_err(|e| StorageError::Init(format!("cloud sync read failed: {e}")))?;
match resp.status() {
StatusCode::NOT_FOUND => {
// No remote archive yet — first sync for this key.
*self.last_etag.borrow_mut() = None;
Ok(None)
}
StatusCode::OK => {
// Capture the ETag for the matching If-Match write.
let etag = resp
.headers()
.get(ETAG)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
*self.last_etag.borrow_mut() = etag;
let bytes = resp
.bytes()
.map_err(|e| StorageError::Init(format!("cloud sync read body failed: {e}")))?;
let archive: PortableArchive = serde_json::from_slice(&bytes).map_err(|e| {
StorageError::Init(format!("failed to parse cloud sync archive: {e}"))
})?;
Ok(Some(archive))
}
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => Err(StorageError::Init(
"cloud sync rejected the sync key (401/403). Check your subscription and \
VESTIGE_CLOUD_SYNC_KEY."
.to_string(),
)),
other => Err(StorageError::Init(format!(
"cloud sync read returned unexpected status {other}"
))),
}
}
fn write_archive(&self, archive: &PortableArchive) -> Result<()> {
let body = serde_json::to_vec(archive)
.map_err(|e| StorageError::Init(format!("failed to serialize archive: {e}")))?;
let mut req = self
.client
.put(self.blob_url())
.header(AUTHORIZATION, format!("Bearer {}", self.sync_key))
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body);
// Optimistic concurrency: only overwrite the object we pulled. If the
// remote had no archive, require that it still doesn't exist (`If-Match: *`
// would require existence, so we omit the header to allow first create).
if let Some(etag) = self.last_etag.borrow_mut().take() {
req = req.header(IF_MATCH, etag);
}
let resp = req
.send()
.map_err(|e| StorageError::Init(format!("cloud sync write failed: {e}")))?;
match resp.status() {
StatusCode::OK | StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
StatusCode::PRECONDITION_FAILED => Err(StorageError::Init(
"cloud sync conflict: another device updated your memory in between. \
Run `vestige sync --cloud` again to merge and retry."
.to_string(),
)),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => Err(StorageError::Init(
"cloud sync rejected the sync key (401/403). Check your subscription and \
VESTIGE_CLOUD_SYNC_KEY."
.to_string(),
)),
StatusCode::PAYLOAD_TOO_LARGE => Err(StorageError::Init(
"cloud sync archive too large for the hosted plan limit".to_string(),
)),
other => Err(StorageError::Init(format!(
"cloud sync write returned unexpected status {other}"
))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::portable::{PortableArchive, PortableTable, PORTABLE_ARCHIVE_FORMAT};
use std::io::{Read, Write};
use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
fn sample_archive() -> PortableArchive {
PortableArchive {
archive_format: PORTABLE_ARCHIVE_FORMAT.to_string(),
vestige_version: "test".to_string(),
schema_version: 1,
exported_at: chrono::Utc::now(),
mode: "exact".to_string(),
tables: vec![PortableTable {
name: "knowledge_nodes".to_string(),
columns: vec!["id".to_string()],
rows: vec![],
}],
}
}
/// A captured request the mock observed, surfaced to the test thread.
#[derive(Debug, Default, Clone)]
struct CapturedRequest {
method: String,
authorization: Option<String>,
if_match: Option<String>,
}
/// Minimal one-shot HTTP mock. `responder` builds the raw HTTP response
/// string for the request line + headers it parsed. Returns the bound base
/// URL and a receiver for the captured request.
fn spawn_mock<F>(responder: F) -> (String, mpsc::Receiver<CapturedRequest>)
where
F: Fn(&CapturedRequest) -> String + Send + 'static,
{
let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock");
let addr = listener.local_addr().expect("addr");
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
if let Ok((mut stream, _)) = listener.accept() {
let mut buf = [0u8; 8192];
let n = stream.read(&mut buf).unwrap_or(0);
let text = String::from_utf8_lossy(&buf[..n]);
let mut cap = CapturedRequest::default();
for (i, line) in text.lines().enumerate() {
if i == 0 {
cap.method = line.split_whitespace().next().unwrap_or("").to_string();
} else if let Some(v) = line.strip_prefix("authorization: ") {
cap.authorization = Some(v.trim().to_string());
} else if let Some(v) = line.strip_prefix("if-match: ") {
cap.if_match = Some(v.trim().to_string());
}
}
let response = responder(&cap);
let _ = stream.write_all(response.as_bytes());
let _ = stream.flush();
let _ = tx.send(cap);
}
});
(format!("http://{addr}"), rx)
}
fn http_response(status: &str, extra_headers: &str, body: &str) -> String {
format!(
"HTTP/1.1 {status}\r\nContent-Length: {}\r\n{extra_headers}Connection: close\r\n\r\n{body}",
body.len()
)
}
#[test]
fn new_rejects_empty_endpoint_and_key() {
assert!(HttpPortableSyncBackend::new("", "key").is_err());
assert!(HttpPortableSyncBackend::new("https://x", "").is_err());
assert!(HttpPortableSyncBackend::new("https://x", "key").is_ok());
}
#[test]
fn endpoint_trailing_slash_trimmed() {
let be = HttpPortableSyncBackend::new("https://sync.example/", "k").unwrap();
assert_eq!(be.blob_url(), "https://sync.example/v1/blob");
}
#[test]
fn read_404_returns_none() {
let (base, rx) = spawn_mock(|_| http_response("404 Not Found", "", ""));
let be = HttpPortableSyncBackend::new(base, "secret").unwrap();
let got = be.read_archive().expect("read ok");
assert!(got.is_none());
let cap = rx.recv().unwrap();
assert_eq!(cap.method, "GET");
assert_eq!(cap.authorization.as_deref(), Some("Bearer secret"));
}
#[test]
fn read_200_parses_and_captures_etag() {
let archive = sample_archive();
let body = serde_json::to_string(&archive).unwrap();
let (base, _rx) = spawn_mock(move |_| {
http_response("200 OK", "ETag: \"v1-abc\"\r\n", &body)
});
let be = HttpPortableSyncBackend::new(base, "secret").unwrap();
let got = be.read_archive().expect("read ok").expect("some archive");
assert_eq!(got.archive_format, PORTABLE_ARCHIVE_FORMAT);
// ETag captured for the next If-Match write.
assert_eq!(be.last_etag.borrow().as_deref(), Some("\"v1-abc\""));
}
#[test]
fn read_401_is_error() {
let (base, _rx) = spawn_mock(|_| http_response("401 Unauthorized", "", ""));
let be = HttpPortableSyncBackend::new(base, "bad").unwrap();
assert!(be.read_archive().is_err());
}
#[test]
fn write_sends_if_match_when_etag_present() {
// Seed an etag as if a prior read happened.
let (base, rx) = spawn_mock(|_| http_response("200 OK", "", ""));
let be = HttpPortableSyncBackend::new(base, "secret").unwrap();
*be.last_etag.borrow_mut() = Some("\"v1-abc\"".to_string());
be.write_archive(&sample_archive()).expect("write ok");
let cap = rx.recv().unwrap();
assert_eq!(cap.method, "PUT");
assert_eq!(cap.authorization.as_deref(), Some("Bearer secret"));
assert_eq!(cap.if_match.as_deref(), Some("\"v1-abc\""));
}
#[test]
fn write_omits_if_match_for_first_create() {
let (base, rx) = spawn_mock(|_| http_response("201 Created", "", ""));
let be = HttpPortableSyncBackend::new(base, "secret").unwrap();
// No prior read → no etag → no If-Match (allow create).
be.write_archive(&sample_archive()).expect("write ok");
let cap = rx.recv().unwrap();
assert_eq!(cap.method, "PUT");
assert!(cap.if_match.is_none());
}
#[test]
fn write_412_is_conflict_error() {
let (base, _rx) = spawn_mock(|_| http_response("412 Precondition Failed", "", ""));
let be = HttpPortableSyncBackend::new(base, "secret").unwrap();
*be.last_etag.borrow_mut() = Some("\"stale\"".to_string());
let err = be.write_archive(&sample_archive()).unwrap_err();
assert!(err.to_string().contains("conflict"));
}
}

View file

@ -2,11 +2,16 @@
//!
//! Backend-agnostic memory store abstraction plus SQLite reference impl.
#[cfg(feature = "cloud-sync")]
mod cloud_sync;
mod memory_store;
mod migrations;
mod portable;
mod sqlite;
#[cfg(feature = "cloud-sync")]
pub use cloud_sync::HttpPortableSyncBackend;
pub use memory_store::{
ClassificationResult, Domain, HealthStatus, LocalMemoryStore, MemoryEdge, MemoryRecord,
MemoryStore, MemoryStoreError, MemoryStoreResult, MemoryStoreSend, ModelSignature,

View file

@ -6332,6 +6332,20 @@ impl SqliteMemoryStore {
self.sync_portable_archive(&backend)
}
/// Synchronize this database with the hosted Vestige Cloud managed-sync
/// service. `endpoint` is the base URL (e.g. `https://sync.vestige.dev`) and
/// `sync_key` is the per-user key issued at purchase. Pull-merge-push is
/// identical to file sync — only the transport differs.
#[cfg(feature = "cloud-sync")]
pub fn sync_portable_archive_cloud(
&self,
endpoint: &str,
sync_key: &str,
) -> Result<PortableSyncReport> {
let backend = super::cloud_sync::HttpPortableSyncBackend::new(endpoint, sync_key)?;
self.sync_portable_archive(&backend)
}
fn merge_portable_table(
tx: &rusqlite::Transaction<'_>,
table_name: &str,

View file

@ -10,13 +10,18 @@ categories = ["command-line-utilities", "database"]
repository = "https://github.com/samvallad33/vestige"
[features]
default = ["embeddings", "ort-download", "vector-search", "connectors"]
default = ["embeddings", "ort-download", "vector-search", "connectors", "cloud-sync"]
embeddings = ["vestige-core/embeddings"]
vector-search = ["vestige-core/vector-search"]
# External-source connectors (#57): GitHub Issues / Redmine indexing via the
# `source_sync` MCP tool. On by default so the tool works out of the box; turn
# off for a build with no HTTP client.
connectors = ["vestige-core/connectors"]
# Hosted managed-sync (Vestige Cloud): `vestige sync --cloud` pushes/pulls the
# portable archive to the hosted blob service. On by default so the command
# works out of the box; the binary already links an HTTP client via
# `connectors`, so this adds no new dependency cost.
cloud-sync = ["vestige-core/cloud-sync"]
# Default ort backend: downloads prebuilt ONNX Runtime at build time.
# Fails on targets without prebuilts (notably x86_64-apple-darwin).
ort-download = ["embeddings", "vestige-core/ort-download"]

View file

@ -182,10 +182,20 @@ enum Commands {
merge: bool,
},
/// Two-way sync with a file-backed portable archive
/// Two-way sync with a file-backed portable archive, or Vestige Cloud
Sync {
/// Sync archive path, often in Dropbox/iCloud/Syncthing/Git
archive: PathBuf,
/// Sync archive path, often in Dropbox/iCloud/Syncthing/Git.
/// Omit when using --cloud.
archive: Option<PathBuf>,
/// Sync with the hosted Vestige Cloud managed-sync service instead of a
/// file. Requires a sync key (VESTIGE_CLOUD_SYNC_KEY) and endpoint
/// (--endpoint or VESTIGE_CLOUD_ENDPOINT).
#[arg(long)]
cloud: bool,
/// Vestige Cloud base endpoint (e.g. https://sync.vestige.dev).
/// Defaults to the VESTIGE_CLOUD_ENDPOINT env var.
#[arg(long)]
endpoint: Option<String>,
},
/// Garbage collect stale memories below retention threshold
@ -287,7 +297,11 @@ fn main() -> anyhow::Result<()> {
} => run_export(output, format, tags, since),
Commands::PortableExport { output } => run_portable_export(output),
Commands::PortableImport { input, merge } => run_portable_import(input, merge),
Commands::Sync { archive } => run_sync(archive),
Commands::Sync {
archive,
cloud,
endpoint,
} => run_sync(archive, cloud, endpoint),
Commands::Gc {
min_retention,
max_age_days,
@ -2193,14 +2207,74 @@ fn run_portable_import(input: PathBuf, merge: bool) -> anyhow::Result<()> {
}
/// Run file-backed two-way sync.
fn run_sync(archive: PathBuf) -> anyhow::Result<()> {
fn run_sync(
archive: Option<PathBuf>,
cloud: bool,
endpoint: Option<String>,
) -> anyhow::Result<()> {
if cloud {
run_sync_cloud(endpoint)
} else {
let archive = archive.ok_or_else(|| {
anyhow::anyhow!(
"no sync target: pass an archive path for file sync, or --cloud for Vestige Cloud"
)
})?;
run_sync_file(archive)
}
}
fn run_sync_file(archive: PathBuf) -> anyhow::Result<()> {
println!("{}", "=== Vestige File Sync ===".cyan().bold());
println!();
println!("{}: {}", "Archive".white().bold(), archive.display());
let storage = open_storage()?;
let report = storage.sync_portable_archive_file(&archive)?;
print_sync_report(&report);
Ok(())
}
#[cfg(feature = "cloud-sync")]
fn run_sync_cloud(endpoint: Option<String>) -> anyhow::Result<()> {
let endpoint = endpoint
.or_else(|| std::env::var("VESTIGE_CLOUD_ENDPOINT").ok())
.filter(|s| !s.trim().is_empty())
.ok_or_else(|| {
anyhow::anyhow!(
"no cloud endpoint: pass --endpoint or set VESTIGE_CLOUD_ENDPOINT \
(e.g. https://sync.vestige.dev)"
)
})?;
let sync_key = std::env::var("VESTIGE_CLOUD_SYNC_KEY")
.ok()
.filter(|s| !s.trim().is_empty())
.ok_or_else(|| {
anyhow::anyhow!(
"no sync key: set VESTIGE_CLOUD_SYNC_KEY (issued when you subscribe to \
Vestige Cloud)"
)
})?;
println!("{}", "=== Vestige Cloud Sync ===".cyan().bold());
println!();
println!("{}: {}", "Endpoint".white().bold(), endpoint);
let storage = open_storage()?;
let report = storage.sync_portable_archive_cloud(&endpoint, &sync_key)?;
print_sync_report(&report);
Ok(())
}
#[cfg(not(feature = "cloud-sync"))]
fn run_sync_cloud(_endpoint: Option<String>) -> anyhow::Result<()> {
anyhow::bail!(
"this build was compiled without the `cloud-sync` feature; rebuild with \
--features cloud-sync to use Vestige Cloud"
)
}
fn print_sync_report(report: &vestige_core::PortableSyncReport) {
if let Some(pull) = &report.pull {
println!("{}", "Pull: merged remote archive".yellow());
println!(
@ -2228,8 +2302,6 @@ fn run_sync(archive: PathBuf) -> anyhow::Result<()> {
.green()
.bold()
);
Ok(())
}
/// Run garbage collection command