refactor(filesystem): remove pre-release store migrations

This commit is contained in:
BukeLy 2026-05-26 17:31:03 +08:00
parent de1992def1
commit 30830fc19e
2 changed files with 11 additions and 354 deletions

View file

@ -9,7 +9,7 @@ from typing import Any, Iterable, Optional
from .types import FileEntry, MetadataField
SCHEMA_VERSION = 5
SCHEMA_VERSION = 1
class SQLiteFileSystemStore:
@ -22,7 +22,7 @@ class SQLiteFileSystemStore:
self.pageindex_client_dir = self.workspace / "artifacts" / "pageindex_client"
for path in (self.text_dir, self.raw_dir, self.pageindex_client_dir):
path.mkdir(parents=True, exist_ok=True)
self.migrate()
self.initialize_schema()
def connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
@ -30,31 +30,13 @@ class SQLiteFileSystemStore:
conn.execute("PRAGMA foreign_keys = ON")
return conn
def migrate(self) -> None:
def initialize_schema(self) -> None:
with self.connect() as conn:
version = conn.execute("PRAGMA user_version").fetchone()[0]
if version < 1:
self._migrate_to_v1(conn)
conn.execute("PRAGMA user_version = 1")
version = 1
if version < 2:
self._migrate_to_v2(conn)
conn.execute("PRAGMA user_version = 2")
version = 2
if version < 3:
self._migrate_to_v3(conn)
conn.execute("PRAGMA user_version = 3")
version = 3
if version < 4:
self._migrate_to_v4(conn)
conn.execute("PRAGMA user_version = 4")
version = 4
if version < 5:
self._migrate_to_v5(conn)
conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}")
self._create_current_schema(conn)
self.ensure_folder(conn, "/")
conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}")
def _migrate_to_v1(self, conn: sqlite3.Connection) -> None:
self._migrate_legacy_tables(conn)
def _create_current_schema(self, conn: sqlite3.Connection) -> None:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS files (
@ -158,183 +140,6 @@ class SQLiteFileSystemStore:
VALUES ('default', NULL, 1, 'active')
"""
)
self.ensure_folder(conn, "/")
self._backfill_legacy_memberships(conn)
self._backfill_metadata_values(conn)
def _migrate_to_v2(self, conn: sqlite3.Connection) -> None:
if "folders" in self._tables(conn):
columns = self._columns(conn, "folders")
if "description" not in columns:
conn.execute("ALTER TABLE folders ADD COLUMN description TEXT NOT NULL DEFAULT ''")
if "metadata_fields" in self._tables(conn):
conn.execute(
"""
UPDATE metadata_fields
SET type = 'string'
WHERE type NOT IN ('string', 'number', 'boolean')
"""
)
def _migrate_to_v3(self, conn: sqlite3.Connection) -> None:
if "folders" in self._tables(conn):
columns = self._columns(conn, "folders")
if "metadata_json" not in columns:
conn.execute("ALTER TABLE folders ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '{}'")
if "file_folders" in self._tables(conn):
columns = self._columns(conn, "file_folders")
if "membership_kind" in columns or "metadata_json" not in columns:
conn.execute("DROP INDEX IF EXISTS idx_file_folders_kind")
conn.execute("DROP INDEX IF EXISTS idx_file_folders_folder")
conn.execute(
"""
CREATE TABLE file_folders_v3 (
file_ref TEXT NOT NULL,
folder_id TEXT NOT NULL,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (file_ref, folder_id),
FOREIGN KEY(file_ref) REFERENCES files(file_ref) ON DELETE CASCADE,
FOREIGN KEY(folder_id) REFERENCES folders(folder_id) ON DELETE CASCADE
)
"""
)
conn.execute(
"""
INSERT OR IGNORE INTO file_folders_v3(file_ref, folder_id, metadata_json, created_at)
SELECT file_ref, folder_id, '{}', MIN(created_at)
FROM file_folders
GROUP BY file_ref, folder_id
"""
)
conn.execute("DROP TABLE file_folders")
conn.execute("ALTER TABLE file_folders_v3 RENAME TO file_folders")
elif "metadata_json" not in columns:
conn.execute("ALTER TABLE file_folders ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '{}'")
else:
conn.execute(
"""
CREATE TABLE file_folders (
file_ref TEXT NOT NULL,
folder_id TEXT NOT NULL,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (file_ref, folder_id),
FOREIGN KEY(file_ref) REFERENCES files(file_ref) ON DELETE CASCADE,
FOREIGN KEY(folder_id) REFERENCES folders(folder_id) ON DELETE CASCADE
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_file_folders_folder ON file_folders(folder_id)")
def _migrate_to_v4(self, conn: sqlite3.Connection) -> None:
if "files" not in self._tables(conn):
return
columns = self._columns(conn, "files")
if "metadata_status_json" not in columns:
conn.execute("ALTER TABLE files ADD COLUMN metadata_status_json TEXT NOT NULL DEFAULT '{}'")
self._backfill_metadata_values(conn)
def _migrate_to_v5(self, conn: sqlite3.Connection) -> None:
if "files" not in self._tables(conn):
return
columns = self._columns(conn, "files")
if "metadata_status_json" not in columns:
conn.execute("ALTER TABLE files ADD COLUMN metadata_status_json TEXT NOT NULL DEFAULT '{}'")
columns = self._columns(conn, "files")
derived_select = (
"derived_metadata_json"
if "derived_metadata_json" in columns
else "'{}' AS derived_metadata_json"
)
legacy_status_select = (
"metadata_generation_json"
if "metadata_generation_json" in columns
else "'{}' AS metadata_generation_json"
)
rows = conn.execute(
f"""
SELECT file_ref, metadata_json, metadata_status_json, {derived_select}, {legacy_status_select}
FROM files
WHERE deleted_at IS NULL
"""
).fetchall()
for row in rows:
metadata = self._json_object(row["metadata_json"])
legacy_generated_values = self._json_object(row["derived_metadata_json"])
metadata_status = self._json_object(row["metadata_status_json"])
legacy_status = self._json_object(row["metadata_generation_json"])
if not metadata_status and legacy_status:
metadata_status = legacy_status
metadata_status = self._normalize_metadata_status(metadata_status)
if legacy_generated_values:
metadata.update(legacy_generated_values)
conn.execute(
"""
UPDATE files
SET metadata_json = ?,
metadata_status_json = ?,
updated_at = CURRENT_TIMESTAMP
WHERE file_ref = ? AND deleted_at IS NULL
""",
(
json.dumps(metadata, ensure_ascii=False),
json.dumps(metadata_status, ensure_ascii=False),
row["file_ref"],
),
)
self._backfill_metadata_values(conn)
def _migrate_legacy_tables(self, conn: sqlite3.Connection) -> None:
tables = self._tables(conn)
if "folders" in tables and "folder_id" not in self._columns(conn, "folders"):
conn.execute("ALTER TABLE folders RENAME TO folders_legacy_v0")
if "files" in tables:
columns = self._columns(conn, "files")
for name, ddl in {
"raw_artifact_path": "ALTER TABLE files ADD COLUMN raw_artifact_path TEXT",
"pageindex_doc_id": "ALTER TABLE files ADD COLUMN pageindex_doc_id TEXT",
"pageindex_tree_status": (
"ALTER TABLE files ADD COLUMN pageindex_tree_status TEXT "
"NOT NULL DEFAULT 'not_built'"
),
"deleted_at": "ALTER TABLE files ADD COLUMN deleted_at TEXT",
}.items():
if name not in columns:
conn.execute(ddl)
def _backfill_legacy_memberships(self, conn: sqlite3.Connection) -> None:
if "files" not in self._tables(conn) or "folder_path" not in self._columns(conn, "files"):
return
rows = conn.execute(
"SELECT file_ref, folder_path FROM files WHERE deleted_at IS NULL"
).fetchall()
for row in rows:
folder_id = self.ensure_folder(conn, row["folder_path"] or "/")
conn.execute(
"""
INSERT OR IGNORE INTO file_folders(file_ref, folder_id, metadata_json)
VALUES (?, ?, '{}')
""",
(row["file_ref"], folder_id),
)
def _backfill_metadata_values(self, conn: sqlite3.Connection) -> None:
if "files" not in self._tables(conn):
return
rows = conn.execute(
"""
SELECT file_ref, metadata_json
FROM files
WHERE deleted_at IS NULL
"""
).fetchall()
for row in rows:
self.replace_metadata_values(
conn,
row["file_ref"],
self.indexed_metadata_values(self._json_object(row["metadata_json"])),
)
@staticmethod
def _json_object(value: Any) -> dict[str, Any]:
@ -344,44 +149,6 @@ class SQLiteFileSystemStore:
return {}
return parsed if isinstance(parsed, dict) else {}
@staticmethod
def _normalize_metadata_status(metadata_status: dict[str, Any]) -> dict[str, Any]:
normalized = dict(metadata_status)
fields = normalized.get("fields")
if isinstance(fields, dict):
normalized["fields"] = {
name: (
{
**state,
"owner": state.get("owner", "pifs"),
"source": state.get("source", "llm"),
}
if isinstance(state, dict)
else state
)
for name, state in fields.items()
}
projection_indexes = normalized.get("projection_indexes")
if isinstance(projection_indexes, dict):
normalized["projection_indexes"] = {
name: (
{
**state,
"owner": state.get("owner", "pifs"),
"source": state.get("source", "index"),
}
if isinstance(state, dict)
else state
)
for name, state in projection_indexes.items()
}
return normalized
@staticmethod
def _tables(conn: sqlite3.Connection) -> set[str]:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type IN ('table', 'virtual table')").fetchall()
return {row["name"] for row in rows}
@staticmethod
def _columns(conn: sqlite3.Connection, table: str) -> set[str]:
return {row["name"] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()}
@ -408,7 +175,6 @@ class SQLiteFileSystemStore:
"SELECT name, field_id FROM metadata_fields WHERE schema_id = 'default'"
).fetchall()
}
include_folder_path = "folder_path" in self._columns(conn, "files")
for record in records:
folder_cache_key = (record["folder_path"], record.get("folder_kind", "manual"))
folder_id = folder_cache.get(folder_cache_key)
@ -419,7 +185,7 @@ class SQLiteFileSystemStore:
kind=record.get("folder_kind", "manual"),
)
folder_cache[folder_cache_key] = folder_id
file_rows.append(self._file_insert_values(record, include_folder_path=include_folder_path))
file_rows.append(self._file_insert_values(record))
membership_rows.append(
(
record["file_ref"],
@ -445,7 +211,7 @@ class SQLiteFileSystemStore:
metadata_field_ids,
)
)
conn.executemany(self._file_insert_sql(include_folder_path=include_folder_path), file_rows)
conn.executemany(self._file_insert_sql(), file_rows)
conn.executemany(
"""
INSERT OR REPLACE INTO file_folders(file_ref, folder_id, metadata_json)
@ -474,7 +240,7 @@ class SQLiteFileSystemStore:
)
@staticmethod
def _file_insert_sql(*, include_folder_path: bool) -> str:
def _file_insert_sql() -> str:
columns = [
"file_ref",
"external_id",
@ -492,8 +258,6 @@ class SQLiteFileSystemStore:
"metadata_json",
"metadata_status_json",
]
if include_folder_path:
columns.append("folder_path")
columns.extend(["deleted_at", "updated_at"])
placeholders = ", ".join(["?"] * (len(columns) - 2) + ["NULL", "CURRENT_TIMESTAMP"])
return f"""
@ -502,7 +266,7 @@ class SQLiteFileSystemStore:
"""
@staticmethod
def _file_insert_values(record: dict[str, Any], *, include_folder_path: bool) -> tuple[Any, ...]:
def _file_insert_values(record: dict[str, Any]) -> tuple[Any, ...]:
values: list[Any] = [
record["file_ref"],
record["external_id"],
@ -520,8 +284,6 @@ class SQLiteFileSystemStore:
record["metadata_json"],
record.get("metadata_status_json", "{}"),
]
if include_folder_path:
values.append(record["folder_path"])
return tuple(values)
def _metadata_insert_values(
@ -610,59 +372,6 @@ class SQLiteFileSystemStore:
),
)
def _insert_file_row(self, conn: sqlite3.Connection, record: dict[str, Any]) -> None:
current_timestamp = object()
columns = [
"file_ref",
"external_id",
"storage_uri",
"source_path",
"title",
"descriptor",
"content_type",
"source_type",
"fingerprint",
"text_artifact_path",
"raw_artifact_path",
"pageindex_doc_id",
"pageindex_tree_status",
"metadata_json",
"metadata_status_json",
"deleted_at",
"updated_at",
]
values: list[Any] = [
record["file_ref"],
record["external_id"],
record["storage_uri"],
record["source_path"],
record["title"],
record["descriptor"],
record["content_type"],
record["source_type"],
record["fingerprint"],
record["text_artifact_path"],
record["raw_artifact_path"],
record.get("pageindex_doc_id"),
record.get("pageindex_tree_status", "not_built"),
record["metadata_json"],
record.get("metadata_status_json", "{}"),
None,
current_timestamp,
]
if "folder_path" in self._columns(conn, "files"):
columns.insert(-2, "folder_path")
values.insert(-2, record["folder_path"])
placeholders = ", ".join("CURRENT_TIMESTAMP" if value is current_timestamp else "?" for value in values)
bound_values = [value for value in values if value is not current_timestamp]
conn.execute(
f"""
INSERT OR REPLACE INTO files ({", ".join(columns)})
VALUES ({placeholders})
""",
bound_values,
)
def replace_metadata_values(
self,
conn: sqlite3.Connection,

View file

@ -208,58 +208,6 @@ def test_batch_metadata_status_generates_into_unified_metadata(tmp_path):
assert after.metadata_status["fields"]["summary"]["status"] == "generated"
def test_v4_metadata_columns_migrate_to_unified_metadata_status(tmp_path):
from pageindex.filesystem import PageIndexFileSystem
source = tmp_path / "source.txt"
source.write_text("fixture text", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = PageIndexFileSystem(workspace=workspace)
file_ref = filesystem.register_file(
storage_uri=source.as_uri(),
source_path="docs/source.txt",
folder_path="/documents",
external_id="doc_migrate",
title="Migrated document",
content=source.read_text(encoding="utf-8"),
metadata={"department": "ops"},
)
with filesystem.store.connect() as conn:
conn.execute(
"ALTER TABLE files ADD COLUMN derived_metadata_json TEXT NOT NULL DEFAULT '{}'"
)
conn.execute(
"ALTER TABLE files ADD COLUMN metadata_generation_json TEXT NOT NULL DEFAULT '{}'"
)
conn.execute(
"""
UPDATE files
SET metadata_json = ?,
derived_metadata_json = ?,
metadata_generation_json = ?,
metadata_status_json = '{}'
WHERE file_ref = ?
""",
(
'{"department":"ops","summary":"raw summary"}',
'{"summary":"generated summary"}',
'{"status":"generated","fields":{"summary":{"requested":true,"status":"generated"}}}',
file_ref,
),
)
conn.execute("PRAGMA user_version = 4")
migrated = PageIndexFileSystem(workspace=workspace)
entry = migrated.store.get_file(file_ref)
assert entry.metadata["department"] == "ops"
assert entry.metadata["summary"] == "generated summary"
assert entry.metadata_status["status"] == "generated"
assert entry.metadata_status["fields"]["summary"]["owner"] == "pifs"
assert entry.metadata_status["fields"]["summary"]["source"] == "llm"
def test_find_maxdepth_zero_type_directory_returns_start_folder(tmp_path):
executor = _register_find_fixture(tmp_path)