diff --git a/pageindex/filesystem/store.py b/pageindex/filesystem/store.py index a0dbe6a..1d85810 100644 --- a/pageindex/filesystem/store.py +++ b/pageindex/filesystem/store.py @@ -9,7 +9,7 @@ from typing import Any, Iterable, Optional from .types import FileEntry, MetadataField -SCHEMA_VERSION = 5 +SCHEMA_VERSION = 1 class SQLiteFileSystemStore: @@ -22,7 +22,7 @@ class SQLiteFileSystemStore: self.pageindex_client_dir = self.workspace / "artifacts" / "pageindex_client" for path in (self.text_dir, self.raw_dir, self.pageindex_client_dir): path.mkdir(parents=True, exist_ok=True) - self.migrate() + self.initialize_schema() def connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path) @@ -30,31 +30,13 @@ class SQLiteFileSystemStore: conn.execute("PRAGMA foreign_keys = ON") return conn - def migrate(self) -> None: + def initialize_schema(self) -> None: with self.connect() as conn: - version = conn.execute("PRAGMA user_version").fetchone()[0] - if version < 1: - self._migrate_to_v1(conn) - conn.execute("PRAGMA user_version = 1") - version = 1 - if version < 2: - self._migrate_to_v2(conn) - conn.execute("PRAGMA user_version = 2") - version = 2 - if version < 3: - self._migrate_to_v3(conn) - conn.execute("PRAGMA user_version = 3") - version = 3 - if version < 4: - self._migrate_to_v4(conn) - conn.execute("PRAGMA user_version = 4") - version = 4 - if version < 5: - self._migrate_to_v5(conn) - conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + self._create_current_schema(conn) + self.ensure_folder(conn, "/") + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") - def _migrate_to_v1(self, conn: sqlite3.Connection) -> None: - self._migrate_legacy_tables(conn) + def _create_current_schema(self, conn: sqlite3.Connection) -> None: conn.executescript( """ CREATE TABLE IF NOT EXISTS files ( @@ -158,183 +140,6 @@ class SQLiteFileSystemStore: VALUES ('default', NULL, 1, 'active') """ ) - self.ensure_folder(conn, "/") - self._backfill_legacy_memberships(conn) - self._backfill_metadata_values(conn) - - def _migrate_to_v2(self, conn: sqlite3.Connection) -> None: - if "folders" in self._tables(conn): - columns = self._columns(conn, "folders") - if "description" not in columns: - conn.execute("ALTER TABLE folders ADD COLUMN description TEXT NOT NULL DEFAULT ''") - if "metadata_fields" in self._tables(conn): - conn.execute( - """ - UPDATE metadata_fields - SET type = 'string' - WHERE type NOT IN ('string', 'number', 'boolean') - """ - ) - - def _migrate_to_v3(self, conn: sqlite3.Connection) -> None: - if "folders" in self._tables(conn): - columns = self._columns(conn, "folders") - if "metadata_json" not in columns: - conn.execute("ALTER TABLE folders ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '{}'") - if "file_folders" in self._tables(conn): - columns = self._columns(conn, "file_folders") - if "membership_kind" in columns or "metadata_json" not in columns: - conn.execute("DROP INDEX IF EXISTS idx_file_folders_kind") - conn.execute("DROP INDEX IF EXISTS idx_file_folders_folder") - conn.execute( - """ - CREATE TABLE file_folders_v3 ( - file_ref TEXT NOT NULL, - folder_id TEXT NOT NULL, - metadata_json TEXT NOT NULL DEFAULT '{}', - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (file_ref, folder_id), - FOREIGN KEY(file_ref) REFERENCES files(file_ref) ON DELETE CASCADE, - FOREIGN KEY(folder_id) REFERENCES folders(folder_id) ON DELETE CASCADE - ) - """ - ) - conn.execute( - """ - INSERT OR IGNORE INTO file_folders_v3(file_ref, folder_id, metadata_json, created_at) - SELECT file_ref, folder_id, '{}', MIN(created_at) - FROM file_folders - GROUP BY file_ref, folder_id - """ - ) - conn.execute("DROP TABLE file_folders") - conn.execute("ALTER TABLE file_folders_v3 RENAME TO file_folders") - elif "metadata_json" not in columns: - conn.execute("ALTER TABLE file_folders ADD COLUMN metadata_json TEXT NOT NULL DEFAULT '{}'") - else: - conn.execute( - """ - CREATE TABLE file_folders ( - file_ref TEXT NOT NULL, - folder_id TEXT NOT NULL, - metadata_json TEXT NOT NULL DEFAULT '{}', - created_at TEXT DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (file_ref, folder_id), - FOREIGN KEY(file_ref) REFERENCES files(file_ref) ON DELETE CASCADE, - FOREIGN KEY(folder_id) REFERENCES folders(folder_id) ON DELETE CASCADE - ) - """ - ) - conn.execute("CREATE INDEX IF NOT EXISTS idx_file_folders_folder ON file_folders(folder_id)") - - def _migrate_to_v4(self, conn: sqlite3.Connection) -> None: - if "files" not in self._tables(conn): - return - columns = self._columns(conn, "files") - if "metadata_status_json" not in columns: - conn.execute("ALTER TABLE files ADD COLUMN metadata_status_json TEXT NOT NULL DEFAULT '{}'") - self._backfill_metadata_values(conn) - - def _migrate_to_v5(self, conn: sqlite3.Connection) -> None: - if "files" not in self._tables(conn): - return - columns = self._columns(conn, "files") - if "metadata_status_json" not in columns: - conn.execute("ALTER TABLE files ADD COLUMN metadata_status_json TEXT NOT NULL DEFAULT '{}'") - columns = self._columns(conn, "files") - derived_select = ( - "derived_metadata_json" - if "derived_metadata_json" in columns - else "'{}' AS derived_metadata_json" - ) - legacy_status_select = ( - "metadata_generation_json" - if "metadata_generation_json" in columns - else "'{}' AS metadata_generation_json" - ) - rows = conn.execute( - f""" - SELECT file_ref, metadata_json, metadata_status_json, {derived_select}, {legacy_status_select} - FROM files - WHERE deleted_at IS NULL - """ - ).fetchall() - for row in rows: - metadata = self._json_object(row["metadata_json"]) - legacy_generated_values = self._json_object(row["derived_metadata_json"]) - metadata_status = self._json_object(row["metadata_status_json"]) - legacy_status = self._json_object(row["metadata_generation_json"]) - if not metadata_status and legacy_status: - metadata_status = legacy_status - metadata_status = self._normalize_metadata_status(metadata_status) - if legacy_generated_values: - metadata.update(legacy_generated_values) - conn.execute( - """ - UPDATE files - SET metadata_json = ?, - metadata_status_json = ?, - updated_at = CURRENT_TIMESTAMP - WHERE file_ref = ? AND deleted_at IS NULL - """, - ( - json.dumps(metadata, ensure_ascii=False), - json.dumps(metadata_status, ensure_ascii=False), - row["file_ref"], - ), - ) - self._backfill_metadata_values(conn) - - def _migrate_legacy_tables(self, conn: sqlite3.Connection) -> None: - tables = self._tables(conn) - if "folders" in tables and "folder_id" not in self._columns(conn, "folders"): - conn.execute("ALTER TABLE folders RENAME TO folders_legacy_v0") - if "files" in tables: - columns = self._columns(conn, "files") - for name, ddl in { - "raw_artifact_path": "ALTER TABLE files ADD COLUMN raw_artifact_path TEXT", - "pageindex_doc_id": "ALTER TABLE files ADD COLUMN pageindex_doc_id TEXT", - "pageindex_tree_status": ( - "ALTER TABLE files ADD COLUMN pageindex_tree_status TEXT " - "NOT NULL DEFAULT 'not_built'" - ), - "deleted_at": "ALTER TABLE files ADD COLUMN deleted_at TEXT", - }.items(): - if name not in columns: - conn.execute(ddl) - - def _backfill_legacy_memberships(self, conn: sqlite3.Connection) -> None: - if "files" not in self._tables(conn) or "folder_path" not in self._columns(conn, "files"): - return - rows = conn.execute( - "SELECT file_ref, folder_path FROM files WHERE deleted_at IS NULL" - ).fetchall() - for row in rows: - folder_id = self.ensure_folder(conn, row["folder_path"] or "/") - conn.execute( - """ - INSERT OR IGNORE INTO file_folders(file_ref, folder_id, metadata_json) - VALUES (?, ?, '{}') - """, - (row["file_ref"], folder_id), - ) - - def _backfill_metadata_values(self, conn: sqlite3.Connection) -> None: - if "files" not in self._tables(conn): - return - rows = conn.execute( - """ - SELECT file_ref, metadata_json - FROM files - WHERE deleted_at IS NULL - """ - ).fetchall() - for row in rows: - self.replace_metadata_values( - conn, - row["file_ref"], - self.indexed_metadata_values(self._json_object(row["metadata_json"])), - ) @staticmethod def _json_object(value: Any) -> dict[str, Any]: @@ -344,44 +149,6 @@ class SQLiteFileSystemStore: return {} return parsed if isinstance(parsed, dict) else {} - @staticmethod - def _normalize_metadata_status(metadata_status: dict[str, Any]) -> dict[str, Any]: - normalized = dict(metadata_status) - fields = normalized.get("fields") - if isinstance(fields, dict): - normalized["fields"] = { - name: ( - { - **state, - "owner": state.get("owner", "pifs"), - "source": state.get("source", "llm"), - } - if isinstance(state, dict) - else state - ) - for name, state in fields.items() - } - projection_indexes = normalized.get("projection_indexes") - if isinstance(projection_indexes, dict): - normalized["projection_indexes"] = { - name: ( - { - **state, - "owner": state.get("owner", "pifs"), - "source": state.get("source", "index"), - } - if isinstance(state, dict) - else state - ) - for name, state in projection_indexes.items() - } - return normalized - - @staticmethod - def _tables(conn: sqlite3.Connection) -> set[str]: - rows = conn.execute("SELECT name FROM sqlite_master WHERE type IN ('table', 'virtual table')").fetchall() - return {row["name"] for row in rows} - @staticmethod def _columns(conn: sqlite3.Connection, table: str) -> set[str]: return {row["name"] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()} @@ -408,7 +175,6 @@ class SQLiteFileSystemStore: "SELECT name, field_id FROM metadata_fields WHERE schema_id = 'default'" ).fetchall() } - include_folder_path = "folder_path" in self._columns(conn, "files") for record in records: folder_cache_key = (record["folder_path"], record.get("folder_kind", "manual")) folder_id = folder_cache.get(folder_cache_key) @@ -419,7 +185,7 @@ class SQLiteFileSystemStore: kind=record.get("folder_kind", "manual"), ) folder_cache[folder_cache_key] = folder_id - file_rows.append(self._file_insert_values(record, include_folder_path=include_folder_path)) + file_rows.append(self._file_insert_values(record)) membership_rows.append( ( record["file_ref"], @@ -445,7 +211,7 @@ class SQLiteFileSystemStore: metadata_field_ids, ) ) - conn.executemany(self._file_insert_sql(include_folder_path=include_folder_path), file_rows) + conn.executemany(self._file_insert_sql(), file_rows) conn.executemany( """ INSERT OR REPLACE INTO file_folders(file_ref, folder_id, metadata_json) @@ -474,7 +240,7 @@ class SQLiteFileSystemStore: ) @staticmethod - def _file_insert_sql(*, include_folder_path: bool) -> str: + def _file_insert_sql() -> str: columns = [ "file_ref", "external_id", @@ -492,8 +258,6 @@ class SQLiteFileSystemStore: "metadata_json", "metadata_status_json", ] - if include_folder_path: - columns.append("folder_path") columns.extend(["deleted_at", "updated_at"]) placeholders = ", ".join(["?"] * (len(columns) - 2) + ["NULL", "CURRENT_TIMESTAMP"]) return f""" @@ -502,7 +266,7 @@ class SQLiteFileSystemStore: """ @staticmethod - def _file_insert_values(record: dict[str, Any], *, include_folder_path: bool) -> tuple[Any, ...]: + def _file_insert_values(record: dict[str, Any]) -> tuple[Any, ...]: values: list[Any] = [ record["file_ref"], record["external_id"], @@ -520,8 +284,6 @@ class SQLiteFileSystemStore: record["metadata_json"], record.get("metadata_status_json", "{}"), ] - if include_folder_path: - values.append(record["folder_path"]) return tuple(values) def _metadata_insert_values( @@ -610,59 +372,6 @@ class SQLiteFileSystemStore: ), ) - def _insert_file_row(self, conn: sqlite3.Connection, record: dict[str, Any]) -> None: - current_timestamp = object() - columns = [ - "file_ref", - "external_id", - "storage_uri", - "source_path", - "title", - "descriptor", - "content_type", - "source_type", - "fingerprint", - "text_artifact_path", - "raw_artifact_path", - "pageindex_doc_id", - "pageindex_tree_status", - "metadata_json", - "metadata_status_json", - "deleted_at", - "updated_at", - ] - values: list[Any] = [ - record["file_ref"], - record["external_id"], - record["storage_uri"], - record["source_path"], - record["title"], - record["descriptor"], - record["content_type"], - record["source_type"], - record["fingerprint"], - record["text_artifact_path"], - record["raw_artifact_path"], - record.get("pageindex_doc_id"), - record.get("pageindex_tree_status", "not_built"), - record["metadata_json"], - record.get("metadata_status_json", "{}"), - None, - current_timestamp, - ] - if "folder_path" in self._columns(conn, "files"): - columns.insert(-2, "folder_path") - values.insert(-2, record["folder_path"]) - placeholders = ", ".join("CURRENT_TIMESTAMP" if value is current_timestamp else "?" for value in values) - bound_values = [value for value in values if value is not current_timestamp] - conn.execute( - f""" - INSERT OR REPLACE INTO files ({", ".join(columns)}) - VALUES ({placeholders}) - """, - bound_values, - ) - def replace_metadata_values( self, conn: sqlite3.Connection, diff --git a/tests/test_pifs_find_maxdepth.py b/tests/test_pifs_find_maxdepth.py index e301248..fea7831 100644 --- a/tests/test_pifs_find_maxdepth.py +++ b/tests/test_pifs_find_maxdepth.py @@ -208,58 +208,6 @@ def test_batch_metadata_status_generates_into_unified_metadata(tmp_path): assert after.metadata_status["fields"]["summary"]["status"] == "generated" -def test_v4_metadata_columns_migrate_to_unified_metadata_status(tmp_path): - from pageindex.filesystem import PageIndexFileSystem - - source = tmp_path / "source.txt" - source.write_text("fixture text", encoding="utf-8") - workspace = tmp_path / "workspace" - filesystem = PageIndexFileSystem(workspace=workspace) - file_ref = filesystem.register_file( - storage_uri=source.as_uri(), - source_path="docs/source.txt", - folder_path="/documents", - external_id="doc_migrate", - title="Migrated document", - content=source.read_text(encoding="utf-8"), - metadata={"department": "ops"}, - ) - - with filesystem.store.connect() as conn: - conn.execute( - "ALTER TABLE files ADD COLUMN derived_metadata_json TEXT NOT NULL DEFAULT '{}'" - ) - conn.execute( - "ALTER TABLE files ADD COLUMN metadata_generation_json TEXT NOT NULL DEFAULT '{}'" - ) - conn.execute( - """ - UPDATE files - SET metadata_json = ?, - derived_metadata_json = ?, - metadata_generation_json = ?, - metadata_status_json = '{}' - WHERE file_ref = ? - """, - ( - '{"department":"ops","summary":"raw summary"}', - '{"summary":"generated summary"}', - '{"status":"generated","fields":{"summary":{"requested":true,"status":"generated"}}}', - file_ref, - ), - ) - conn.execute("PRAGMA user_version = 4") - - migrated = PageIndexFileSystem(workspace=workspace) - entry = migrated.store.get_file(file_ref) - - assert entry.metadata["department"] == "ops" - assert entry.metadata["summary"] == "generated summary" - assert entry.metadata_status["status"] == "generated" - assert entry.metadata_status["fields"]["summary"]["owner"] == "pifs" - assert entry.metadata_status["fields"]["summary"]["source"] == "llm" - - def test_find_maxdepth_zero_type_directory_returns_start_folder(tmp_path): executor = _register_find_fixture(tmp_path)