Add ANN search support for vec0 virtual table

Add approximate nearest neighbor infrastructure to vec0: shared distance
dispatch (vec0_distance_full), flat index type with parser, NEON-optimized
cosine/Hamming for float32/int8, amalgamation script, and benchmark suite
(benchmarks-ann/) with ground-truth generation and profiling tools. Remove
unused vec_npy_each/vec_static_blobs code, fix missing stdint.h include.
This commit is contained in:
Alex Garcia 2026-03-29 19:44:44 -07:00
parent dfd8dc5290
commit bf2455f2ba
27 changed files with 2177 additions and 2116 deletions

2
benchmarks-ann/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.db
runs/

61
benchmarks-ann/Makefile Normal file
View file

@ -0,0 +1,61 @@
BENCH = python bench.py
BASE_DB = seed/base.db
EXT = ../dist/vec0
# --- Baseline (brute-force) configs ---
BASELINES = \
"brute-float:type=baseline,variant=float" \
"brute-int8:type=baseline,variant=int8" \
"brute-bit:type=baseline,variant=bit"
# --- Index-specific configs ---
# Each index branch should add its own configs here. Example:
#
# DISKANN_CONFIGS = \
# "diskann-R48-binary:type=diskann,R=48,L=128,quantizer=binary" \
# "diskann-R72-int8:type=diskann,R=72,L=128,quantizer=int8"
#
# IVF_CONFIGS = \
# "ivf-n128-p16:type=ivf,nlist=128,nprobe=16"
#
# ANNOY_CONFIGS = \
# "annoy-t50:type=annoy,n_trees=50"
ALL_CONFIGS = $(BASELINES)
.PHONY: seed ground-truth bench-smoke bench-10k bench-50k bench-100k bench-all \
report clean
# --- Data preparation ---
seed:
$(MAKE) -C seed
ground-truth: seed
python ground_truth.py --subset-size 10000
python ground_truth.py --subset-size 50000
python ground_truth.py --subset-size 100000
# --- Quick smoke test ---
bench-smoke: seed
$(BENCH) --subset-size 5000 -k 10 -n 20 -o runs/smoke \
$(BASELINES)
# --- Standard sizes ---
bench-10k: seed
$(BENCH) --subset-size 10000 -k 10 -o runs/10k $(ALL_CONFIGS)
bench-50k: seed
$(BENCH) --subset-size 50000 -k 10 -o runs/50k $(ALL_CONFIGS)
bench-100k: seed
$(BENCH) --subset-size 100000 -k 10 -o runs/100k $(ALL_CONFIGS)
bench-all: bench-10k bench-50k bench-100k
# --- Report ---
report:
@echo "Use: sqlite3 runs/<dir>/results.db 'SELECT * FROM bench_results ORDER BY recall DESC'"
# --- Cleanup ---
clean:
rm -rf runs/

81
benchmarks-ann/README.md Normal file
View file

@ -0,0 +1,81 @@
# KNN Benchmarks for sqlite-vec
Benchmarking infrastructure for vec0 KNN configurations. Includes brute-force
baselines (float, int8, bit); index-specific branches add their own types
via the `INDEX_REGISTRY` in `bench.py`.
## Prerequisites
- Built `dist/vec0` extension (run `make` from repo root)
- Python 3.10+
- `uv` (for seed data prep): `pip install uv`
## Quick start
```bash
# 1. Download dataset and build seed DB (~3 GB download, ~5 min)
make seed
# 2. Run a quick smoke test (5k vectors, ~1 min)
make bench-smoke
# 3. Run full benchmark at 10k
make bench-10k
```
## Usage
### Direct invocation
```bash
python bench.py --subset-size 10000 \
"brute-float:type=baseline,variant=float" \
"brute-int8:type=baseline,variant=int8" \
"brute-bit:type=baseline,variant=bit"
```
### Config format
`name:type=<index_type>,key=val,key=val`
| Index type | Keys | Branch |
|-----------|------|--------|
| `baseline` | `variant` (float/int8/bit), `oversample` | this branch |
Index branches register additional types in `INDEX_REGISTRY`. See the
docstring in `bench.py` for the extension API.
### Make targets
| Target | Description |
|--------|-------------|
| `make seed` | Download COHERE 1M dataset |
| `make ground-truth` | Pre-compute ground truth for 10k/50k/100k |
| `make bench-smoke` | Quick 5k baseline test |
| `make bench-10k` | All configs at 10k vectors |
| `make bench-50k` | All configs at 50k vectors |
| `make bench-100k` | All configs at 100k vectors |
| `make bench-all` | 10k + 50k + 100k |
## Adding an index type
In your index branch, add an entry to `INDEX_REGISTRY` in `bench.py` and
append your configs to `ALL_CONFIGS` in the `Makefile`. See the existing
`baseline` entry and the comments in both files for the pattern.
## Results
Results are stored in `runs/<dir>/results.db` using the schema in `schema.sql`.
```bash
sqlite3 runs/10k/results.db "
SELECT config_name, recall, mean_ms, qps
FROM bench_results
ORDER BY recall DESC
"
```
## Dataset
[Zilliz COHERE Medium 1M](https://zilliz.com/learn/datasets-for-vector-database-benchmarks):
768 dimensions, cosine distance, 1M train vectors + 10k query vectors with precomputed neighbors.

488
benchmarks-ann/bench.py Normal file
View file

@ -0,0 +1,488 @@
#!/usr/bin/env python3
"""Benchmark runner for sqlite-vec KNN configurations.
Measures insert time, build/train time, DB size, KNN latency, and recall
across different vec0 configurations.
Config format: name:type=<index_type>,key=val,key=val
Baseline (brute-force) keys:
type=baseline, variant=float|int8|bit, oversample=8
Index-specific types can be registered via INDEX_REGISTRY (see below).
Usage:
python bench.py --subset-size 10000 \
"brute-float:type=baseline,variant=float" \
"brute-int8:type=baseline,variant=int8" \
"brute-bit:type=baseline,variant=bit"
"""
import argparse
import os
import sqlite3
import statistics
import time
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
EXT_PATH = os.path.join(_SCRIPT_DIR, "..", "dist", "vec0")
BASE_DB = os.path.join(_SCRIPT_DIR, "seed", "base.db")
INSERT_BATCH_SIZE = 1000
# ============================================================================
# Index registry — extension point for ANN index branches
# ============================================================================
#
# Each index type provides a dict with:
# "defaults": dict of default params
# "create_table_sql": fn(params) -> SQL string
# "insert_sql": fn(params) -> SQL string (or None for default)
# "post_insert_hook": fn(conn, params) -> train_time_s (or None)
# "run_query": fn(conn, params, query, k) -> [(id, distance), ...] (or None for default MATCH)
# "describe": fn(params) -> str (one-line description)
#
# To add a new index type, add an entry here. Example (in your branch):
#
# INDEX_REGISTRY["diskann"] = {
# "defaults": {"R": 72, "L": 128, "quantizer": "binary", "buffer_threshold": 0},
# "create_table_sql": lambda p: f"CREATE VIRTUAL TABLE vec_items USING vec0(...)",
# "insert_sql": None,
# "post_insert_hook": None,
# "run_query": None,
# "describe": lambda p: f"diskann q={p['quantizer']} R={p['R']} L={p['L']}",
# }
INDEX_REGISTRY = {}
# ============================================================================
# Baseline implementation
# ============================================================================
def _baseline_create_table_sql(params):
variant = params["variant"]
extra = ""
if variant == "int8":
extra = ", embedding_int8 int8[768]"
elif variant == "bit":
extra = ", embedding_bq bit[768]"
return (
f"CREATE VIRTUAL TABLE vec_items USING vec0("
f" chunk_size=256,"
f" id integer primary key,"
f" embedding float[768] distance_metric=cosine"
f" {extra})"
)
def _baseline_insert_sql(params):
variant = params["variant"]
if variant == "int8":
return (
"INSERT INTO vec_items(id, embedding, embedding_int8) "
"SELECT id, vector, vec_quantize_int8(vector, 'unit') "
"FROM base.train WHERE id >= :lo AND id < :hi"
)
elif variant == "bit":
return (
"INSERT INTO vec_items(id, embedding, embedding_bq) "
"SELECT id, vector, vec_quantize_binary(vector) "
"FROM base.train WHERE id >= :lo AND id < :hi"
)
return None # use default
def _baseline_run_query(conn, params, query, k):
variant = params["variant"]
oversample = params.get("oversample", 8)
if variant == "int8":
return conn.execute(
"WITH coarse AS ("
" SELECT id, embedding FROM vec_items"
" WHERE embedding_int8 MATCH vec_quantize_int8(:query, 'unit')"
" LIMIT :oversample_k"
") "
"SELECT id, vec_distance_cosine(embedding, :query) as distance "
"FROM coarse ORDER BY 2 LIMIT :k",
{"query": query, "k": k, "oversample_k": k * oversample},
).fetchall()
elif variant == "bit":
return conn.execute(
"WITH coarse AS ("
" SELECT id, embedding FROM vec_items"
" WHERE embedding_bq MATCH vec_quantize_binary(:query)"
" LIMIT :oversample_k"
") "
"SELECT id, vec_distance_cosine(embedding, :query) as distance "
"FROM coarse ORDER BY 2 LIMIT :k",
{"query": query, "k": k, "oversample_k": k * oversample},
).fetchall()
return None # use default MATCH
def _baseline_describe(params):
v = params["variant"]
if v in ("int8", "bit"):
return f"baseline {v} (os={params['oversample']})"
return f"baseline {v}"
INDEX_REGISTRY["baseline"] = {
"defaults": {"variant": "float", "oversample": 8},
"create_table_sql": _baseline_create_table_sql,
"insert_sql": _baseline_insert_sql,
"post_insert_hook": None,
"run_query": _baseline_run_query,
"describe": _baseline_describe,
}
# ============================================================================
# Config parsing
# ============================================================================
INT_KEYS = {
"R", "L", "buffer_threshold", "nlist", "nprobe", "oversample",
"n_trees", "search_k",
}
def parse_config(spec):
"""Parse 'name:type=baseline,key=val,...' into (name, params_dict)."""
if ":" in spec:
name, opts_str = spec.split(":", 1)
else:
name, opts_str = spec, ""
raw = {}
if opts_str:
for kv in opts_str.split(","):
k, v = kv.split("=", 1)
raw[k.strip()] = v.strip()
index_type = raw.pop("type", "baseline")
if index_type not in INDEX_REGISTRY:
raise ValueError(
f"Unknown index type: {index_type}. "
f"Available: {', '.join(sorted(INDEX_REGISTRY.keys()))}"
)
reg = INDEX_REGISTRY[index_type]
params = dict(reg["defaults"])
for k, v in raw.items():
if k in INT_KEYS:
params[k] = int(v)
else:
params[k] = v
params["index_type"] = index_type
return name, params
# ============================================================================
# Shared helpers
# ============================================================================
def load_query_vectors(base_db_path, n):
conn = sqlite3.connect(base_db_path)
rows = conn.execute(
"SELECT id, vector FROM query_vectors ORDER BY id LIMIT :n", {"n": n}
).fetchall()
conn.close()
return [(r[0], r[1]) for r in rows]
def insert_loop(conn, sql, subset_size, label=""):
t0 = time.perf_counter()
for lo in range(0, subset_size, INSERT_BATCH_SIZE):
hi = min(lo + INSERT_BATCH_SIZE, subset_size)
conn.execute(sql, {"lo": lo, "hi": hi})
conn.commit()
done = hi
if done % 5000 == 0 or done == subset_size:
elapsed = time.perf_counter() - t0
rate = done / elapsed if elapsed > 0 else 0
print(
f" [{label}] {done:>8}/{subset_size} "
f"{elapsed:.1f}s {rate:.0f} rows/s",
flush=True,
)
return time.perf_counter() - t0
def open_bench_db(db_path, ext_path, base_db):
if os.path.exists(db_path):
os.remove(db_path)
conn = sqlite3.connect(db_path)
conn.enable_load_extension(True)
conn.load_extension(ext_path)
conn.execute("PRAGMA page_size=8192")
conn.execute(f"ATTACH DATABASE '{base_db}' AS base")
return conn
DEFAULT_INSERT_SQL = (
"INSERT INTO vec_items(id, embedding) "
"SELECT id, vector FROM base.train WHERE id >= :lo AND id < :hi"
)
# ============================================================================
# Build
# ============================================================================
def build_index(base_db, ext_path, name, params, subset_size, out_dir):
db_path = os.path.join(out_dir, f"{name}.{subset_size}.db")
conn = open_bench_db(db_path, ext_path, base_db)
reg = INDEX_REGISTRY[params["index_type"]]
conn.execute(reg["create_table_sql"](params))
label = params["index_type"]
print(f" Inserting {subset_size} vectors...")
sql_fn = reg.get("insert_sql")
sql = sql_fn(params) if sql_fn else None
if sql is None:
sql = DEFAULT_INSERT_SQL
insert_time = insert_loop(conn, sql, subset_size, label)
train_time = 0.0
hook = reg.get("post_insert_hook")
if hook:
train_time = hook(conn, params)
row_count = conn.execute("SELECT count(*) FROM vec_items").fetchone()[0]
conn.close()
file_size_mb = os.path.getsize(db_path) / (1024 * 1024)
return {
"db_path": db_path,
"insert_time_s": round(insert_time, 3),
"train_time_s": round(train_time, 3),
"total_time_s": round(insert_time + train_time, 3),
"insert_per_vec_ms": round((insert_time / row_count) * 1000, 2)
if row_count
else 0,
"rows": row_count,
"file_size_mb": round(file_size_mb, 2),
}
# ============================================================================
# KNN measurement
# ============================================================================
def _default_match_query(conn, query, k):
return conn.execute(
"SELECT id, distance FROM vec_items "
"WHERE embedding MATCH :query AND k = :k",
{"query": query, "k": k},
).fetchall()
def measure_knn(db_path, ext_path, base_db, params, subset_size, k=10, n=50):
conn = sqlite3.connect(db_path)
conn.enable_load_extension(True)
conn.load_extension(ext_path)
conn.execute(f"ATTACH DATABASE '{base_db}' AS base")
query_vectors = load_query_vectors(base_db, n)
reg = INDEX_REGISTRY[params["index_type"]]
query_fn = reg.get("run_query")
times_ms = []
recalls = []
for qid, query in query_vectors:
t0 = time.perf_counter()
results = None
if query_fn:
results = query_fn(conn, params, query, k)
if results is None:
results = _default_match_query(conn, query, k)
elapsed_ms = (time.perf_counter() - t0) * 1000
times_ms.append(elapsed_ms)
result_ids = set(r[0] for r in results)
# Ground truth: use pre-computed neighbors table for full dataset,
# otherwise brute-force over the subset
if subset_size >= 1000000:
gt_rows = conn.execute(
"SELECT CAST(neighbors_id AS INTEGER) FROM base.neighbors "
"WHERE query_vector_id = :qid AND rank < :k",
{"qid": qid, "k": k},
).fetchall()
else:
gt_rows = conn.execute(
"SELECT id FROM ("
" SELECT id, vec_distance_cosine(vector, :query) as dist "
" FROM base.train WHERE id < :n ORDER BY dist LIMIT :k"
")",
{"query": query, "k": k, "n": subset_size},
).fetchall()
gt_ids = set(r[0] for r in gt_rows)
if gt_ids:
recalls.append(len(result_ids & gt_ids) / len(gt_ids))
else:
recalls.append(0.0)
conn.close()
return {
"mean_ms": round(statistics.mean(times_ms), 2),
"median_ms": round(statistics.median(times_ms), 2),
"p99_ms": round(sorted(times_ms)[int(len(times_ms) * 0.99)], 2)
if len(times_ms) > 1
else round(times_ms[0], 2),
"total_ms": round(sum(times_ms), 2),
"recall": round(statistics.mean(recalls), 4),
}
# ============================================================================
# Results persistence
# ============================================================================
def save_results(results_path, rows):
db = sqlite3.connect(results_path)
db.executescript(open(os.path.join(_SCRIPT_DIR, "schema.sql")).read())
for r in rows:
db.execute(
"INSERT OR REPLACE INTO build_results "
"(config_name, index_type, subset_size, db_path, "
" insert_time_s, train_time_s, total_time_s, rows, file_size_mb) "
"VALUES (?,?,?,?,?,?,?,?,?)",
(
r["name"], r["index_type"], r["n_vectors"], r["db_path"],
r["insert_time_s"], r["train_time_s"], r["total_time_s"],
r["rows"], r["file_size_mb"],
),
)
db.execute(
"INSERT OR REPLACE INTO bench_results "
"(config_name, index_type, subset_size, k, n, "
" mean_ms, median_ms, p99_ms, total_ms, qps, recall, db_path) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(
r["name"], r["index_type"], r["n_vectors"], r["k"], r["n_queries"],
r["mean_ms"], r["median_ms"], r["p99_ms"], r["total_ms"],
round(r["n_queries"] / (r["total_ms"] / 1000), 1)
if r["total_ms"] > 0 else 0,
r["recall"], r["db_path"],
),
)
db.commit()
db.close()
# ============================================================================
# Reporting
# ============================================================================
def print_report(all_results):
print(
f"\n{'name':>20} {'N':>7} {'type':>10} {'config':>28} "
f"{'ins(s)':>7} {'train':>6} {'MB':>7} "
f"{'qry(ms)':>8} {'recall':>7}"
)
print("-" * 115)
for r in all_results:
train = f"{r['train_time_s']:.1f}" if r["train_time_s"] > 0 else "-"
print(
f"{r['name']:>20} {r['n_vectors']:>7} {r['index_type']:>10} "
f"{r['config_desc']:>28} "
f"{r['insert_time_s']:>7.1f} {train:>6} {r['file_size_mb']:>7.1f} "
f"{r['mean_ms']:>8.2f} {r['recall']:>7.4f}"
)
# ============================================================================
# Main
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="Benchmark runner for sqlite-vec KNN configurations",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("configs", nargs="+", help="config specs (name:type=X,key=val,...)")
parser.add_argument("--subset-size", type=int, required=True)
parser.add_argument("-k", type=int, default=10, help="KNN k (default 10)")
parser.add_argument("-n", type=int, default=50, help="number of queries (default 50)")
parser.add_argument("--base-db", default=BASE_DB)
parser.add_argument("--ext", default=EXT_PATH)
parser.add_argument("-o", "--out-dir", default="runs")
parser.add_argument("--results-db", default=None,
help="path to results DB (default: <out-dir>/results.db)")
args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
results_db = args.results_db or os.path.join(args.out_dir, "results.db")
configs = [parse_config(c) for c in args.configs]
all_results = []
for i, (name, params) in enumerate(configs, 1):
reg = INDEX_REGISTRY[params["index_type"]]
desc = reg["describe"](params)
print(f"\n[{i}/{len(configs)}] {name} ({desc.strip()})")
build = build_index(
args.base_db, args.ext, name, params, args.subset_size, args.out_dir
)
train_str = f" + {build['train_time_s']}s train" if build["train_time_s"] > 0 else ""
print(
f" Build: {build['insert_time_s']}s insert{train_str} "
f"{build['file_size_mb']} MB"
)
print(f" Measuring KNN (k={args.k}, n={args.n})...")
knn = measure_knn(
build["db_path"], args.ext, args.base_db,
params, args.subset_size, k=args.k, n=args.n,
)
print(f" KNN: mean={knn['mean_ms']}ms recall@{args.k}={knn['recall']}")
all_results.append({
"name": name,
"n_vectors": args.subset_size,
"index_type": params["index_type"],
"config_desc": desc,
"db_path": build["db_path"],
"insert_time_s": build["insert_time_s"],
"train_time_s": build["train_time_s"],
"total_time_s": build["total_time_s"],
"insert_per_vec_ms": build["insert_per_vec_ms"],
"rows": build["rows"],
"file_size_mb": build["file_size_mb"],
"k": args.k,
"n_queries": args.n,
"mean_ms": knn["mean_ms"],
"median_ms": knn["median_ms"],
"p99_ms": knn["p99_ms"],
"total_ms": knn["total_ms"],
"recall": knn["recall"],
})
print_report(all_results)
save_results(results_db, all_results)
print(f"\nResults saved to {results_db}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""Compute per-subset ground truth for ANN benchmarks.
For subset sizes < 1M, builds a temporary vec0 float table with the first N
vectors and runs brute-force KNN to get correct ground truth per subset.
For 1M (the full dataset), converts the existing `neighbors` table.
Output: ground_truth.{subset_size}.db with table:
ground_truth(query_vector_id, rank, neighbor_id, distance)
Usage:
python ground_truth.py --subset-size 50000
python ground_truth.py --subset-size 1000000
"""
import argparse
import os
import sqlite3
import time
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
EXT_PATH = os.path.join(_SCRIPT_DIR, "..", "dist", "vec0")
BASE_DB = os.path.join(_SCRIPT_DIR, "seed", "base.db")
FULL_DATASET_SIZE = 1_000_000
def gen_ground_truth_subset(base_db, ext_path, subset_size, n_queries, k, out_path):
"""Build ground truth by brute-force KNN over the first `subset_size` vectors."""
if os.path.exists(out_path):
os.remove(out_path)
conn = sqlite3.connect(out_path)
conn.enable_load_extension(True)
conn.load_extension(ext_path)
conn.execute(
"CREATE TABLE ground_truth ("
" query_vector_id INTEGER NOT NULL,"
" rank INTEGER NOT NULL,"
" neighbor_id INTEGER NOT NULL,"
" distance REAL NOT NULL,"
" PRIMARY KEY (query_vector_id, rank)"
")"
)
conn.execute(f"ATTACH DATABASE '{base_db}' AS base")
print(f" Building temp vec0 table with {subset_size} vectors...")
conn.execute(
"CREATE VIRTUAL TABLE tmp_vec USING vec0("
" id integer primary key,"
" embedding float[768] distance_metric=cosine"
")"
)
t0 = time.perf_counter()
conn.execute(
"INSERT INTO tmp_vec(id, embedding) "
"SELECT id, vector FROM base.train WHERE id < :n",
{"n": subset_size},
)
conn.commit()
build_time = time.perf_counter() - t0
print(f" Temp table built in {build_time:.1f}s")
query_vectors = conn.execute(
"SELECT id, vector FROM base.query_vectors ORDER BY id LIMIT :n",
{"n": n_queries},
).fetchall()
print(f" Running brute-force KNN for {len(query_vectors)} queries, k={k}...")
t0 = time.perf_counter()
for i, (qid, qvec) in enumerate(query_vectors):
results = conn.execute(
"SELECT id, distance FROM tmp_vec "
"WHERE embedding MATCH :query AND k = :k",
{"query": qvec, "k": k},
).fetchall()
for rank, (nid, dist) in enumerate(results):
conn.execute(
"INSERT INTO ground_truth(query_vector_id, rank, neighbor_id, distance) "
"VALUES (?, ?, ?, ?)",
(qid, rank, nid, dist),
)
if (i + 1) % 10 == 0 or i == 0:
elapsed = time.perf_counter() - t0
eta = (elapsed / (i + 1)) * (len(query_vectors) - i - 1)
print(
f" {i+1}/{len(query_vectors)} queries "
f"elapsed={elapsed:.1f}s eta={eta:.1f}s",
flush=True,
)
conn.commit()
conn.execute("DROP TABLE tmp_vec")
conn.execute("DETACH DATABASE base")
conn.commit()
elapsed = time.perf_counter() - t0
total_rows = conn.execute("SELECT count(*) FROM ground_truth").fetchone()[0]
conn.close()
print(f" Ground truth: {total_rows} rows in {elapsed:.1f}s -> {out_path}")
def gen_ground_truth_full(base_db, n_queries, k, out_path):
"""Convert the existing neighbors table for the full 1M dataset."""
if os.path.exists(out_path):
os.remove(out_path)
conn = sqlite3.connect(out_path)
conn.execute(f"ATTACH DATABASE '{base_db}' AS base")
conn.execute(
"CREATE TABLE ground_truth ("
" query_vector_id INTEGER NOT NULL,"
" rank INTEGER NOT NULL,"
" neighbor_id INTEGER NOT NULL,"
" distance REAL,"
" PRIMARY KEY (query_vector_id, rank)"
")"
)
conn.execute(
"INSERT INTO ground_truth(query_vector_id, rank, neighbor_id) "
"SELECT query_vector_id, rank, CAST(neighbors_id AS INTEGER) "
"FROM base.neighbors "
"WHERE query_vector_id < :n AND rank < :k",
{"n": n_queries, "k": k},
)
conn.commit()
total_rows = conn.execute("SELECT count(*) FROM ground_truth").fetchone()[0]
conn.execute("DETACH DATABASE base")
conn.close()
print(f" Ground truth (full): {total_rows} rows -> {out_path}")
def main():
parser = argparse.ArgumentParser(description="Generate per-subset ground truth")
parser.add_argument(
"--subset-size", type=int, required=True, help="number of vectors in subset"
)
parser.add_argument("-n", type=int, default=100, help="number of query vectors")
parser.add_argument("-k", type=int, default=100, help="max k for ground truth")
parser.add_argument("--base-db", default=BASE_DB)
parser.add_argument("--ext", default=EXT_PATH)
parser.add_argument(
"-o", "--out-dir", default=os.path.join(_SCRIPT_DIR, "seed"),
help="output directory for ground_truth.{N}.db",
)
args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
out_path = os.path.join(args.out_dir, f"ground_truth.{args.subset_size}.db")
if args.subset_size >= FULL_DATASET_SIZE:
gen_ground_truth_full(args.base_db, args.n, args.k, out_path)
else:
gen_ground_truth_subset(
args.base_db, args.ext, args.subset_size, args.n, args.k, out_path
)
if __name__ == "__main__":
main()

440
benchmarks-ann/profile.py Normal file
View file

@ -0,0 +1,440 @@
#!/usr/bin/env python3
"""CPU profiling for sqlite-vec KNN configurations using macOS `sample` tool.
Builds dist/sqlite3 (with -g3), generates a SQL workload (inserts + repeated
KNN queries) for each config, profiles the sqlite3 process with `sample`, and
prints the top-N hottest functions by self (exclusive) CPU samples.
Usage:
cd benchmarks-ann
uv run profile.py --subset-size 50000 -n 50 \\
"baseline-int8:type=baseline,variant=int8,oversample=8" \\
"rescore-int8:type=rescore,quantizer=int8,oversample=8"
"""
import argparse
import os
import re
import shutil
import subprocess
import sys
import tempfile
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
_PROJECT_ROOT = os.path.join(_SCRIPT_DIR, "..")
sys.path.insert(0, _SCRIPT_DIR)
from bench import (
BASE_DB,
DEFAULT_INSERT_SQL,
INDEX_REGISTRY,
INSERT_BATCH_SIZE,
parse_config,
)
SQLITE3_PATH = os.path.join(_PROJECT_ROOT, "dist", "sqlite3")
EXT_PATH = os.path.join(_PROJECT_ROOT, "dist", "vec0")
# ============================================================================
# SQL generation
# ============================================================================
def _query_sql_for_config(params, query_id, k):
"""Return a SQL query string for a single KNN query by query_vector id."""
index_type = params["index_type"]
qvec = f"(SELECT vector FROM base.query_vectors WHERE id = {query_id})"
if index_type == "baseline":
variant = params.get("variant", "float")
oversample = params.get("oversample", 8)
oversample_k = k * oversample
if variant == "int8":
return (
f"WITH coarse AS ("
f" SELECT id, embedding FROM vec_items"
f" WHERE embedding_int8 MATCH vec_quantize_int8({qvec}, 'unit')"
f" LIMIT {oversample_k}"
f") "
f"SELECT id, vec_distance_cosine(embedding, {qvec}) as distance "
f"FROM coarse ORDER BY 2 LIMIT {k};"
)
elif variant == "bit":
return (
f"WITH coarse AS ("
f" SELECT id, embedding FROM vec_items"
f" WHERE embedding_bq MATCH vec_quantize_binary({qvec})"
f" LIMIT {oversample_k}"
f") "
f"SELECT id, vec_distance_cosine(embedding, {qvec}) as distance "
f"FROM coarse ORDER BY 2 LIMIT {k};"
)
# Default MATCH query (baseline-float, rescore, and others)
return (
f"SELECT id, distance FROM vec_items"
f" WHERE embedding MATCH {qvec} AND k = {k};"
)
def generate_sql(db_path, params, subset_size, n_queries, k, repeats):
"""Generate a complete SQL workload: load ext, create table, insert, query."""
lines = []
lines.append(".bail on")
lines.append(f".load {EXT_PATH}")
lines.append(f"ATTACH DATABASE '{os.path.abspath(BASE_DB)}' AS base;")
lines.append("PRAGMA page_size=8192;")
# Create table
reg = INDEX_REGISTRY[params["index_type"]]
lines.append(reg["create_table_sql"](params) + ";")
# Inserts
sql_fn = reg.get("insert_sql")
insert_sql = sql_fn(params) if sql_fn else None
if insert_sql is None:
insert_sql = DEFAULT_INSERT_SQL
for lo in range(0, subset_size, INSERT_BATCH_SIZE):
hi = min(lo + INSERT_BATCH_SIZE, subset_size)
stmt = insert_sql.replace(":lo", str(lo)).replace(":hi", str(hi))
lines.append(stmt + ";")
if hi % 10000 == 0 or hi == subset_size:
lines.append("-- progress: inserted %d/%d" % (hi, subset_size))
# Queries (repeated)
lines.append("-- BEGIN QUERIES")
for _rep in range(repeats):
for qid in range(n_queries):
lines.append(_query_sql_for_config(params, qid, k))
return "\n".join(lines)
# ============================================================================
# Profiling with macOS `sample`
# ============================================================================
def run_profile(sqlite3_path, db_path, sql_file, sample_output, duration=120):
"""Run sqlite3 under macOS `sample` profiler.
Starts sqlite3 directly with stdin from the SQL file, then immediately
attaches `sample` to its PID with -mayDie (tolerates process exit).
The workload must be long enough for sample to attach and capture useful data.
"""
sql_fd = open(sql_file, "r")
proc = subprocess.Popen(
[sqlite3_path, db_path],
stdin=sql_fd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
pid = proc.pid
print(f" sqlite3 PID: {pid}")
# Attach sample immediately (1ms interval, -mayDie tolerates process exit)
sample_proc = subprocess.Popen(
["sample", str(pid), str(duration), "1", "-mayDie", "-file", sample_output],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
# Wait for sqlite3 to finish
_, stderr = proc.communicate()
sql_fd.close()
rc = proc.returncode
if rc != 0:
print(f" sqlite3 failed (rc={rc}):", file=sys.stderr)
print(f" {stderr.decode().strip()}", file=sys.stderr)
sample_proc.kill()
return False
# Wait for sample to finish
sample_proc.wait()
return True
# ============================================================================
# Parse `sample` output
# ============================================================================
# Tree-drawing characters used by macOS `sample` to represent hierarchy.
# We replace them with spaces so indentation depth reflects tree depth.
_TREE_CHARS_RE = re.compile(r"[+!:|]")
# After tree chars are replaced with spaces, each call-graph line looks like:
# " 800 rescore_knn (in vec0.dylib) + 3808,3640,... [0x1a,0x2b,...] file.c:123"
# We extract just (indent, count, symbol, module) — everything after "(in ...)"
# is decoration we don't need.
_LEADING_RE = re.compile(r"^(\s+)(\d+)\s+(.+)")
def _extract_symbol_and_module(rest):
"""Given the text after 'count ', extract (symbol, module).
Handles patterns like:
'rescore_knn (in vec0.dylib) + 3808,3640,... [0x...]'
'pread (in libsystem_kernel.dylib) + 8 [0x...]'
'??? (in <unknown binary>) [0x...]'
'start (in dyld) + 2840 [0x198650274]'
'Thread_26759239 DispatchQueue_1: ...'
"""
# Try to find "(in ...)" to split symbol from module
m = re.match(r"^(.+?)\s+\(in\s+(.+?)\)", rest)
if m:
return m.group(1).strip(), m.group(2).strip()
# No module — return whole thing as symbol, strip trailing junk
sym = re.sub(r"\s+\[0x[0-9a-f].*", "", rest).strip()
return sym, ""
def _parse_call_graph_lines(text):
"""Parse call-graph section into list of (depth, count, symbol, module)."""
entries = []
for raw_line in text.split("\n"):
# Strip tree-drawing characters, replace with spaces to preserve depth
line = _TREE_CHARS_RE.sub(" ", raw_line)
m = _LEADING_RE.match(line)
if not m:
continue
depth = len(m.group(1))
count = int(m.group(2))
rest = m.group(3)
symbol, module = _extract_symbol_and_module(rest)
entries.append((depth, count, symbol, module))
return entries
def parse_sample_output(filepath):
"""Parse `sample` call-graph output, compute exclusive (self) samples per function.
Returns dict of {display_name: self_sample_count}.
"""
with open(filepath, "r") as f:
text = f.read()
# Find "Call graph:" section
cg_start = text.find("Call graph:")
if cg_start == -1:
print(" Warning: no 'Call graph:' section found in sample output")
return {}
# End at "Total number in stack" or EOF
cg_end = text.find("\nTotal number in stack", cg_start)
if cg_end == -1:
cg_end = len(text)
entries = _parse_call_graph_lines(text[cg_start:cg_end])
if not entries:
print(" Warning: no call graph entries parsed")
return {}
# Compute self (exclusive) samples per function:
# self = count - sum(direct_children_counts)
self_samples = {}
for i, (depth, count, sym, mod) in enumerate(entries):
children_sum = 0
child_depth = None
for j in range(i + 1, len(entries)):
j_depth = entries[j][0]
if j_depth <= depth:
break
if child_depth is None:
child_depth = j_depth
if j_depth == child_depth:
children_sum += entries[j][1]
self_count = count - children_sum
if self_count > 0:
key = f"{sym} ({mod})" if mod else sym
self_samples[key] = self_samples.get(key, 0) + self_count
return self_samples
# ============================================================================
# Display
# ============================================================================
def print_profile(title, self_samples, top_n=20):
total = sum(self_samples.values())
if total == 0:
print(f"\n=== {title} (no samples) ===")
return
sorted_syms = sorted(self_samples.items(), key=lambda x: -x[1])
print(f"\n=== {title} (top {top_n}, {total} total self-samples) ===")
for sym, count in sorted_syms[:top_n]:
pct = 100.0 * count / total
print(f" {pct:5.1f}% {count:>6} {sym}")
# ============================================================================
# Main
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="CPU profiling for sqlite-vec KNN configurations",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"configs", nargs="+", help="config specs (name:type=X,key=val,...)"
)
parser.add_argument("--subset-size", type=int, required=True)
parser.add_argument("-k", type=int, default=10, help="KNN k (default 10)")
parser.add_argument(
"-n", type=int, default=50, help="number of distinct queries (default 50)"
)
parser.add_argument(
"--repeats",
type=int,
default=10,
help="repeat query set N times for more samples (default 10)",
)
parser.add_argument(
"--top", type=int, default=20, help="show top N functions (default 20)"
)
parser.add_argument("--base-db", default=BASE_DB)
parser.add_argument("--sqlite3", default=SQLITE3_PATH)
parser.add_argument(
"--keep-temp",
action="store_true",
help="keep temp directory with DBs, SQL, and sample output",
)
args = parser.parse_args()
# Check prerequisites
if not os.path.exists(args.base_db):
print(f"Error: base DB not found at {args.base_db}", file=sys.stderr)
print("Run 'make seed' in benchmarks-ann/ first.", file=sys.stderr)
sys.exit(1)
if not shutil.which("sample"):
print("Error: macOS 'sample' tool not found.", file=sys.stderr)
sys.exit(1)
# Build CLI
print("Building dist/sqlite3...")
result = subprocess.run(
["make", "cli"], cwd=_PROJECT_ROOT, capture_output=True, text=True
)
if result.returncode != 0:
print(f"Error: make cli failed:\n{result.stderr}", file=sys.stderr)
sys.exit(1)
print(" done.")
if not os.path.exists(args.sqlite3):
print(f"Error: sqlite3 not found at {args.sqlite3}", file=sys.stderr)
sys.exit(1)
configs = [parse_config(c) for c in args.configs]
tmpdir = tempfile.mkdtemp(prefix="sqlite-vec-profile-")
print(f"Working directory: {tmpdir}")
all_profiles = []
for i, (name, params) in enumerate(configs, 1):
reg = INDEX_REGISTRY[params["index_type"]]
desc = reg["describe"](params)
print(f"\n[{i}/{len(configs)}] {name} ({desc})")
# Generate SQL workload
db_path = os.path.join(tmpdir, f"{name}.db")
sql_text = generate_sql(
db_path, params, args.subset_size, args.n, args.k, args.repeats
)
sql_file = os.path.join(tmpdir, f"{name}.sql")
with open(sql_file, "w") as f:
f.write(sql_text)
total_queries = args.n * args.repeats
print(
f" SQL workload: {args.subset_size} inserts + "
f"{total_queries} queries ({args.n} x {args.repeats} repeats)"
)
# Profile
sample_file = os.path.join(tmpdir, f"{name}.sample.txt")
print(f" Profiling...")
ok = run_profile(args.sqlite3, db_path, sql_file, sample_file)
if not ok:
print(f" FAILED — skipping {name}")
all_profiles.append((name, desc, {}))
continue
if not os.path.exists(sample_file):
print(f" Warning: sample output not created")
all_profiles.append((name, desc, {}))
continue
# Parse
self_samples = parse_sample_output(sample_file)
all_profiles.append((name, desc, self_samples))
# Show individual profile
print_profile(f"{name} ({desc})", self_samples, args.top)
# Side-by-side comparison if multiple configs
if len(all_profiles) > 1:
print("\n" + "=" * 80)
print("COMPARISON")
print("=" * 80)
# Collect all symbols that appear in top-N of any config
all_syms = set()
for _name, _desc, prof in all_profiles:
sorted_syms = sorted(prof.items(), key=lambda x: -x[1])
for sym, _count in sorted_syms[: args.top]:
all_syms.add(sym)
# Build comparison table
rows = []
for sym in all_syms:
row = [sym]
for _name, _desc, prof in all_profiles:
total = sum(prof.values())
count = prof.get(sym, 0)
pct = 100.0 * count / total if total > 0 else 0.0
row.append((pct, count))
max_pct = max(r[0] for r in row[1:])
rows.append((max_pct, row))
rows.sort(key=lambda x: -x[0])
# Header
header = f"{'function':>40}"
for name, desc, _ in all_profiles:
header += f" {name:>14}"
print(header)
print("-" * len(header))
for _sort_key, row in rows[: args.top * 2]:
sym = row[0]
display_sym = sym if len(sym) <= 40 else sym[:37] + "..."
line = f"{display_sym:>40}"
for pct, count in row[1:]:
if count > 0:
line += f" {pct:>13.1f}%"
else:
line += f" {'-':>14}"
print(line)
if args.keep_temp:
print(f"\nTemp files kept at: {tmpdir}")
else:
shutil.rmtree(tmpdir)
print(f"\nTemp files cleaned up. Use --keep-temp to preserve.")
if __name__ == "__main__":
main()

35
benchmarks-ann/schema.sql Normal file
View file

@ -0,0 +1,35 @@
-- Canonical results schema for vec0 KNN benchmark comparisons.
-- The index_type column is a free-form TEXT field. Baseline configs use
-- "baseline"; index-specific branches add their own types (registered
-- via INDEX_REGISTRY in bench.py).
CREATE TABLE IF NOT EXISTS build_results (
config_name TEXT NOT NULL,
index_type TEXT NOT NULL,
subset_size INTEGER NOT NULL,
db_path TEXT NOT NULL,
insert_time_s REAL NOT NULL,
train_time_s REAL, -- NULL when no training/build step is needed
total_time_s REAL NOT NULL,
rows INTEGER NOT NULL,
file_size_mb REAL NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (config_name, subset_size)
);
CREATE TABLE IF NOT EXISTS bench_results (
config_name TEXT NOT NULL,
index_type TEXT NOT NULL,
subset_size INTEGER NOT NULL,
k INTEGER NOT NULL,
n INTEGER NOT NULL,
mean_ms REAL NOT NULL,
median_ms REAL NOT NULL,
p99_ms REAL NOT NULL,
total_ms REAL NOT NULL,
qps REAL NOT NULL,
recall REAL NOT NULL,
db_path TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (config_name, subset_size, k)
);

2
benchmarks-ann/seed/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.parquet
base.db

View file

@ -0,0 +1,24 @@
BASE_URL = https://assets.zilliz.com/benchmark/cohere_medium_1m
PARQUETS = train.parquet test.parquet neighbors.parquet
.PHONY: all download base.db clean
all: base.db
download: $(PARQUETS)
train.parquet:
curl -L -o $@ $(BASE_URL)/train.parquet
test.parquet:
curl -L -o $@ $(BASE_URL)/test.parquet
neighbors.parquet:
curl -L -o $@ $(BASE_URL)/neighbors.parquet
base.db: $(PARQUETS) build_base_db.py
uv run --with pandas --with pyarrow python build_base_db.py
clean:
rm -f base.db

View file

@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""Build base.db from downloaded parquet files.
Reads train.parquet, test.parquet, neighbors.parquet and creates a SQLite
database with tables: train, query_vectors, neighbors.
Usage:
uv run --with pandas --with pyarrow python build_base_db.py
"""
import json
import os
import sqlite3
import struct
import sys
import time
import pandas as pd
def float_list_to_blob(floats):
"""Pack a list of floats into a little-endian f32 blob."""
return struct.pack(f"<{len(floats)}f", *floats)
def main():
seed_dir = os.path.dirname(os.path.abspath(__file__))
db_path = os.path.join(seed_dir, "base.db")
train_path = os.path.join(seed_dir, "train.parquet")
test_path = os.path.join(seed_dir, "test.parquet")
neighbors_path = os.path.join(seed_dir, "neighbors.parquet")
for p in (train_path, test_path, neighbors_path):
if not os.path.exists(p):
print(f"ERROR: {p} not found. Run 'make download' first.")
sys.exit(1)
if os.path.exists(db_path):
os.remove(db_path)
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA page_size=4096")
# --- query_vectors (from test.parquet) ---
print("Loading test.parquet (query vectors)...")
t0 = time.perf_counter()
df_test = pd.read_parquet(test_path)
conn.execute(
"CREATE TABLE query_vectors (id INTEGER PRIMARY KEY, vector BLOB)"
)
rows = []
for _, row in df_test.iterrows():
rows.append((int(row["id"]), float_list_to_blob(row["emb"])))
conn.executemany("INSERT INTO query_vectors (id, vector) VALUES (?, ?)", rows)
conn.commit()
print(f" {len(rows)} query vectors in {time.perf_counter() - t0:.1f}s")
# --- neighbors (from neighbors.parquet) ---
print("Loading neighbors.parquet...")
t0 = time.perf_counter()
df_neighbors = pd.read_parquet(neighbors_path)
conn.execute(
"CREATE TABLE neighbors ("
" query_vector_id INTEGER, rank INTEGER, neighbors_id TEXT,"
" UNIQUE(query_vector_id, rank))"
)
rows = []
for _, row in df_neighbors.iterrows():
qid = int(row["id"])
# neighbors_id may be a numpy array or JSON string
nids = row["neighbors_id"]
if isinstance(nids, str):
nids = json.loads(nids)
for rank, nid in enumerate(nids):
rows.append((qid, rank, str(int(nid))))
conn.executemany(
"INSERT INTO neighbors (query_vector_id, rank, neighbors_id) VALUES (?, ?, ?)",
rows,
)
conn.commit()
print(f" {len(rows)} neighbor rows in {time.perf_counter() - t0:.1f}s")
# --- train (from train.parquet) ---
print("Loading train.parquet (1M vectors, this takes a few minutes)...")
t0 = time.perf_counter()
conn.execute(
"CREATE TABLE train (id INTEGER PRIMARY KEY, vector BLOB)"
)
batch_size = 10000
df_iter = pd.read_parquet(train_path)
total = len(df_iter)
for start in range(0, total, batch_size):
chunk = df_iter.iloc[start : start + batch_size]
rows = []
for _, row in chunk.iterrows():
rows.append((int(row["id"]), float_list_to_blob(row["emb"])))
conn.executemany("INSERT INTO train (id, vector) VALUES (?, ?)", rows)
conn.commit()
done = min(start + batch_size, total)
elapsed = time.perf_counter() - t0
rate = done / elapsed if elapsed > 0 else 0
eta = (total - done) / rate if rate > 0 else 0
print(
f" {done:>8}/{total} {elapsed:.0f}s {rate:.0f} rows/s eta {eta:.0f}s",
flush=True,
)
elapsed = time.perf_counter() - t0
print(f" {total} train vectors in {elapsed:.1f}s")
conn.close()
size_mb = os.path.getsize(db_path) / (1024 * 1024)
print(f"\nDone: {db_path} ({size_mb:.0f} MB)")
if __name__ == "__main__":
main()