mirror of
https://github.com/VectifyAI/PageIndex.git
synced 2026-06-12 19:55:17 +02:00
feat(pifs): add workspace file import command
This commit is contained in:
parent
8f87cee6ce
commit
7096ba1388
4 changed files with 499 additions and 1 deletions
|
|
@ -279,6 +279,23 @@ def _run_passthrough(
|
|||
return 0
|
||||
|
||||
|
||||
def _run_add(argv: list[str], *, workspace: str) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="pifs add",
|
||||
description="Add a local file to a PageIndex FileSystem workspace",
|
||||
)
|
||||
parser.add_argument("physical_path")
|
||||
parser.add_argument("virtual_target")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
filesystem = _filesystem_from_workspace(workspace)
|
||||
info = filesystem.add_file(args.physical_path, args.virtual_target)
|
||||
print(f"added: {info.get('path') or '/' + str(info.get('source_path') or '').strip('/')}")
|
||||
print(f"file_ref: {info['file_ref']}")
|
||||
print(f"storage_uri: {info['storage_uri']}")
|
||||
return 0
|
||||
|
||||
|
||||
def _run_set(argv: list[str]) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="pifs set",
|
||||
|
|
@ -326,6 +343,10 @@ def main(argv: list[str] | None = None) -> int:
|
|||
return _run_ask(command_args, workspace_default=args.workspace)
|
||||
if command_name == "chat":
|
||||
return _run_chat(command_args, workspace_default=args.workspace)
|
||||
if command_name == "add":
|
||||
if not args.workspace:
|
||||
parser.error("--workspace is required unless PIFS_WORKSPACE is set or `pifs set workspace <path>` has been run")
|
||||
return _run_add(command_args, workspace=args.workspace)
|
||||
|
||||
if "--json" in command_tokens:
|
||||
command_tokens = [token for token in command_tokens if token != "--json"]
|
||||
|
|
|
|||
|
|
@ -2,7 +2,9 @@ from __future__ import annotations
|
|||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path, PurePosixPath
|
||||
from typing import TYPE_CHECKING, Any, Optional, Union
|
||||
from urllib.parse import unquote, urlparse
|
||||
|
||||
|
|
@ -91,6 +93,13 @@ PAGEINDEX_DOCUMENT_CONTENT_TYPES = {
|
|||
}
|
||||
TEXT_ARTIFACT_SUFFIXES = {".txt", ".text"}
|
||||
TEXT_ARTIFACT_CONTENT_TYPES = {"text/plain"}
|
||||
ADD_FILE_CONTENT_TYPES = {
|
||||
".pdf": "application/pdf",
|
||||
".md": "text/markdown",
|
||||
".markdown": "text/markdown",
|
||||
".txt": "text/plain",
|
||||
".text": "text/plain",
|
||||
}
|
||||
|
||||
|
||||
class PageIndexFileSystem:
|
||||
|
|
@ -171,6 +180,83 @@ class PageIndexFileSystem:
|
|||
self._ensure_register_completion_defaults()
|
||||
return self.register_file(**kwargs)
|
||||
|
||||
def add_file(
|
||||
self,
|
||||
physical_path: Union[str, Path],
|
||||
virtual_target: Union[str, Path],
|
||||
) -> dict[str, Any]:
|
||||
source = Path(physical_path).expanduser()
|
||||
if not source.is_file():
|
||||
raise FileNotFoundError(f"Source file not found: {source}")
|
||||
suffix = source.suffix.lower()
|
||||
content_type = ADD_FILE_CONTENT_TYPES.get(suffix)
|
||||
if content_type is None:
|
||||
supported = ", ".join(sorted(ADD_FILE_CONTENT_TYPES))
|
||||
raise ValueError(
|
||||
f"Unsupported file type: {suffix or '<none>'}; supported: {supported}"
|
||||
)
|
||||
|
||||
folder_path, filename, virtual_path = self._resolve_add_target(
|
||||
virtual_target,
|
||||
physical_basename=source.name,
|
||||
physical_suffix=suffix,
|
||||
)
|
||||
if self.store.file_basename_exists_in_folder(folder_path, filename):
|
||||
raise FileExistsError(f"File already exists at {virtual_path}")
|
||||
|
||||
self._ensure_add_completion_defaults()
|
||||
file_ref = make_file_ref(virtual_path.strip("/"))
|
||||
uploads_dir = self.workspace / "artifacts" / "uploads"
|
||||
final_dir = uploads_dir / file_ref
|
||||
final_path = final_dir / filename
|
||||
final_dir_created = False
|
||||
records: list[dict[str, Any]] = []
|
||||
|
||||
uploads_dir.mkdir(parents=True, exist_ok=True)
|
||||
with tempfile.TemporaryDirectory(prefix=f".add-{file_ref}-", dir=uploads_dir) as tmp:
|
||||
temp_path = Path(tmp) / filename
|
||||
try:
|
||||
shutil.copy2(source, temp_path)
|
||||
if final_dir.exists():
|
||||
raise FileExistsError(
|
||||
f"Workspace artifact already exists for {virtual_path}: {final_dir}"
|
||||
)
|
||||
final_dir.mkdir(parents=True)
|
||||
final_dir_created = True
|
||||
os.replace(temp_path, final_path)
|
||||
|
||||
record = self._prepare_file_record(
|
||||
{
|
||||
"storage_uri": final_path.as_uri(),
|
||||
"source_path": virtual_path.strip("/"),
|
||||
"folder_path": folder_path,
|
||||
"metadata": {},
|
||||
"external_id": None,
|
||||
"title": filename,
|
||||
"content": self._add_file_content(final_path, content_type),
|
||||
"content_type": content_type,
|
||||
"metadata_policy": self._add_metadata_policy(),
|
||||
}
|
||||
)
|
||||
records = [record]
|
||||
self._require_add_pageindex_ready(record)
|
||||
self._generate_register_metadata(record)
|
||||
self._require_add_metadata_ready(record)
|
||||
self._complete_summary_projection_index(record)
|
||||
self._require_add_summary_projection_ready(record)
|
||||
self._register_generation_policy_schema(records)
|
||||
self._sync_owned_raw_artifact(record)
|
||||
self.store.insert_files(records)
|
||||
except Exception:
|
||||
self._cleanup_failed_register_artifacts(records)
|
||||
if final_dir_created:
|
||||
shutil.rmtree(final_dir, ignore_errors=True)
|
||||
raise
|
||||
|
||||
info = self.store.file_info(file_ref)
|
||||
info["path"] = virtual_path
|
||||
return info
|
||||
|
||||
def register_files(self, files: list[dict[str, Any]]) -> list[str]:
|
||||
records = [self._prepare_file_record(file) for file in files]
|
||||
try:
|
||||
|
|
@ -250,6 +336,25 @@ class PageIndexFileSystem:
|
|||
embedding_timeout=self.summary_projection_embedding_timeout,
|
||||
)
|
||||
|
||||
def _ensure_add_completion_defaults(self) -> None:
|
||||
if self.metadata_generator is None:
|
||||
self.metadata_generator = MetadataGenerator(
|
||||
provider=self.metadata_provider,
|
||||
model=self.metadata_model,
|
||||
base_url=self.metadata_base_url,
|
||||
max_text_chars=self.metadata_max_text_chars,
|
||||
)
|
||||
if self.summary_projection_index and self.summary_projection_indexer is None:
|
||||
from .projection_indexing import SummaryProjectionIndexer
|
||||
|
||||
self.summary_projection_indexer = SummaryProjectionIndexer.from_provider(
|
||||
self.summary_projection_index_dir,
|
||||
embedding_provider=self.summary_projection_embedding_provider,
|
||||
embedding_model=self.summary_projection_embedding_model,
|
||||
embedding_dimensions=self.summary_projection_embedding_dimensions,
|
||||
embedding_timeout=self.summary_projection_embedding_timeout,
|
||||
)
|
||||
|
||||
def configure_existing_projection_retrieval(self) -> bool:
|
||||
"""Attach semantic retrieval to already-built projection indexes.
|
||||
|
||||
|
|
@ -1075,6 +1180,117 @@ class PageIndexFileSystem:
|
|||
def _create_folder(self, path: str) -> str:
|
||||
return self.create_folder(path)
|
||||
|
||||
@classmethod
|
||||
def _resolve_add_target(
|
||||
cls,
|
||||
virtual_target: Union[str, Path],
|
||||
*,
|
||||
physical_basename: str,
|
||||
physical_suffix: str,
|
||||
) -> tuple[str, str, str]:
|
||||
raw_target = str(virtual_target).strip()
|
||||
if not raw_target:
|
||||
raise ValueError("pifs add target is required")
|
||||
normalized = normalize_path(raw_target)
|
||||
posix_target = PurePosixPath(normalized)
|
||||
raw_looks_like_folder = raw_target.replace("\\", "/").endswith("/")
|
||||
target_suffix = posix_target.suffix.lower()
|
||||
if raw_looks_like_folder or target_suffix not in ADD_FILE_CONTENT_TYPES:
|
||||
folder_path = normalized
|
||||
filename = physical_basename
|
||||
else:
|
||||
if target_suffix != physical_suffix:
|
||||
raise ValueError(
|
||||
"pifs add target file extension must match the physical file extension"
|
||||
)
|
||||
folder_path = normalize_path(str(posix_target.parent))
|
||||
filename = posix_target.name
|
||||
cls._validate_add_filename(filename)
|
||||
virtual_path = cls._join_virtual_file_path(folder_path, filename)
|
||||
return folder_path, filename, virtual_path
|
||||
|
||||
@staticmethod
|
||||
def _validate_add_filename(filename: str) -> None:
|
||||
if not filename or filename in {".", ".."}:
|
||||
raise ValueError("pifs add target filename is required")
|
||||
if "/" in filename or "\\" in filename:
|
||||
raise ValueError("pifs add target filename must be a basename")
|
||||
|
||||
@staticmethod
|
||||
def _join_virtual_file_path(folder_path: str, filename: str) -> str:
|
||||
folder_path = normalize_path(folder_path)
|
||||
if folder_path == "/":
|
||||
return f"/{filename}"
|
||||
return f"{folder_path}/{filename}"
|
||||
|
||||
@staticmethod
|
||||
def _add_metadata_policy() -> dict[str, Any]:
|
||||
return {
|
||||
"fields": {
|
||||
"summary": True,
|
||||
"doc_type": False,
|
||||
"domain": False,
|
||||
"topic": False,
|
||||
"entity": False,
|
||||
"relation": False,
|
||||
},
|
||||
"projection_indexes": {"summary": True},
|
||||
"batch": False,
|
||||
}
|
||||
|
||||
def _add_file_content(self, path: Path, content_type: str) -> str:
|
||||
if self._source_format(str(path), content_type) in {"markdown", "text"}:
|
||||
return path.read_text(encoding="utf-8")
|
||||
return ""
|
||||
|
||||
def _require_add_pageindex_ready(self, record: dict[str, Any]) -> None:
|
||||
if self._source_format(record["source_path"], record["content_type"]) not in {
|
||||
"pdf",
|
||||
"markdown",
|
||||
}:
|
||||
return
|
||||
if record.get("pageindex_tree_status") == "built" and record.get("pageindex_doc_id"):
|
||||
return
|
||||
message = self._pageindex_tree_failure_message(record.get("metadata_status")) or (
|
||||
"PageIndex tree was not built"
|
||||
)
|
||||
raise RuntimeError(f"pifs add failed to build PageIndex tree: {message}")
|
||||
|
||||
@staticmethod
|
||||
def _require_add_metadata_ready(record: dict[str, Any]) -> None:
|
||||
metadata = record.get("metadata") or {}
|
||||
summary = str(metadata.get("summary") or "").strip()
|
||||
if not summary:
|
||||
raise MetadataGenerationError(
|
||||
"pifs add requires synchronous generated summary metadata"
|
||||
)
|
||||
status = record.get("metadata_status") or {}
|
||||
summary_status = (status.get("fields") or {}).get("summary") or {}
|
||||
if summary_status.get("status") != "generated":
|
||||
raise MetadataGenerationError(
|
||||
"pifs add requires generated summary metadata before registration"
|
||||
)
|
||||
if status.get("status") == "failed":
|
||||
raise MetadataGenerationError(
|
||||
"pifs add metadata generation failed before registration"
|
||||
)
|
||||
|
||||
def _require_add_summary_projection_ready(self, record: dict[str, Any]) -> None:
|
||||
if not self.summary_projection_index:
|
||||
return
|
||||
summary_projection = (
|
||||
(record.get("metadata_status") or {})
|
||||
.get("projection_indexes", {})
|
||||
.get("summary")
|
||||
)
|
||||
if not summary_projection or not summary_projection.get("requested"):
|
||||
raise RuntimeError("pifs add requires a requested summary projection index")
|
||||
if summary_projection.get("status") != "ready":
|
||||
detail = summary_projection.get("error") or summary_projection.get("status")
|
||||
raise RuntimeError(
|
||||
f"pifs add failed to build summary projection index: {detail}"
|
||||
)
|
||||
|
||||
def _prepare_file_record(self, file: dict[str, Any]) -> dict[str, Any]:
|
||||
storage_uri = file["storage_uri"]
|
||||
raw_source_path = str(file["source_path"])
|
||||
|
|
|
|||
|
|
@ -1412,6 +1412,36 @@ class SQLiteFileSystemStore:
|
|||
).fetchone()
|
||||
return int(row["count"] or 0)
|
||||
|
||||
def file_basename_exists_in_folder(self, path: str, basename: str) -> bool:
|
||||
path = normalize_path(path)
|
||||
basename = str(basename).strip()
|
||||
if not basename:
|
||||
return False
|
||||
with self.connect() as conn:
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM files f
|
||||
JOIN file_folders ff ON ff.file_ref = f.file_ref
|
||||
JOIN folders fo ON fo.folder_id = ff.folder_id
|
||||
WHERE f.deleted_at IS NULL
|
||||
AND fo.path = ?
|
||||
AND (
|
||||
f.title = ?
|
||||
OR f.source_path = ?
|
||||
OR f.source_path LIKE ? ESCAPE '\\'
|
||||
)
|
||||
LIMIT 1
|
||||
""",
|
||||
(
|
||||
path,
|
||||
basename,
|
||||
basename,
|
||||
"%/" + self._like_escape(basename),
|
||||
),
|
||||
).fetchone()
|
||||
return row is not None
|
||||
|
||||
def folder_subtree_thresholds(
|
||||
self,
|
||||
path: str,
|
||||
|
|
|
|||
231
tests/test_pifs_add_command.py
Normal file
231
tests/test_pifs_add_command.py
Normal file
|
|
@ -0,0 +1,231 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class GeneratedMetadata:
|
||||
def __init__(self):
|
||||
self.calls = []
|
||||
|
||||
def generate(self, request, *, fields):
|
||||
self.calls.append((request, list(fields)))
|
||||
values = {
|
||||
"summary": f"Summary for {request.title}: {request.text[:60]}",
|
||||
"doc_type": "uploaded_file",
|
||||
"domain": "workspace",
|
||||
"topic": "pifs add",
|
||||
}
|
||||
return {field: values[field] for field in fields if field in values}
|
||||
|
||||
|
||||
class RecordingSummaryIndexer:
|
||||
def __init__(self):
|
||||
self.upserted = []
|
||||
|
||||
def upsert_summary(self, record):
|
||||
self.upserted.append(dict(record))
|
||||
return {"status": "ready", "indexed_rows": 1}
|
||||
|
||||
|
||||
def write_pageindex_client_doc(workspace: Path, doc_id: str, doc: dict) -> None:
|
||||
workspace.mkdir(parents=True, exist_ok=True)
|
||||
(workspace / f"{doc_id}.json").write_text(
|
||||
json.dumps(doc, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
meta = {
|
||||
doc_id: {
|
||||
"type": doc.get("type", ""),
|
||||
"doc_name": doc.get("doc_name", ""),
|
||||
"doc_description": doc.get("doc_description", ""),
|
||||
"path": doc.get("path", ""),
|
||||
"line_count": doc.get("line_count"),
|
||||
}
|
||||
}
|
||||
(workspace / "_meta.json").write_text(
|
||||
json.dumps(meta, ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
|
||||
def test_add_text_folder_target_copies_artifact_indexes_summary_and_is_readable(tmp_path):
|
||||
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
|
||||
|
||||
source = tmp_path / "filing.txt"
|
||||
source.write_text("alpha filing text for pifs add", encoding="utf-8")
|
||||
indexer = RecordingSummaryIndexer()
|
||||
filesystem = PageIndexFileSystem(
|
||||
workspace=tmp_path / "workspace",
|
||||
metadata_generator=GeneratedMetadata(),
|
||||
summary_projection_indexer=indexer,
|
||||
)
|
||||
|
||||
info = filesystem.add_file(str(source), "/documents/reports")
|
||||
|
||||
assert info["source_path"] == "documents/reports/filing.txt"
|
||||
assert info["folder_path"] == "/documents/reports"
|
||||
assert filesystem.folder_info("/documents/reports")["path"] == "/documents/reports"
|
||||
assert info["storage_uri"] != source.as_uri()
|
||||
assert "/artifacts/uploads/" in info["storage_uri"]
|
||||
copied_path = Path(info["storage_uri"].removeprefix("file://"))
|
||||
assert copied_path.read_text(encoding="utf-8") == "alpha filing text for pifs add"
|
||||
assert copied_path.resolve() != source.resolve()
|
||||
|
||||
executor = PIFSCommandExecutor(filesystem, json_output=True)
|
||||
rendered = json.loads(executor.execute("cat /documents/reports/filing.txt --all"))
|
||||
|
||||
assert rendered["data"]["text"] == "alpha filing text for pifs add"
|
||||
assert info["metadata"]["summary"].startswith("Summary for filing.txt")
|
||||
assert indexer.upserted[0]["file_ref"] == info["file_ref"]
|
||||
assert indexer.upserted[0]["metadata"]["summary"] == info["metadata"]["summary"]
|
||||
|
||||
|
||||
def test_add_rejects_same_folder_same_basename_without_overwrite(tmp_path):
|
||||
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
|
||||
|
||||
source = tmp_path / "conflict.txt"
|
||||
source.write_text("first body", encoding="utf-8")
|
||||
filesystem = PageIndexFileSystem(
|
||||
workspace=tmp_path / "workspace",
|
||||
metadata_generator=GeneratedMetadata(),
|
||||
summary_projection_indexer=RecordingSummaryIndexer(),
|
||||
)
|
||||
|
||||
filesystem.add_file(source, "/documents")
|
||||
source.write_text("second body must not overwrite", encoding="utf-8")
|
||||
|
||||
with pytest.raises(FileExistsError, match="already exists"):
|
||||
filesystem.add_file(source, "/documents")
|
||||
|
||||
executor = PIFSCommandExecutor(filesystem, json_output=True)
|
||||
rendered = json.loads(executor.execute("cat /documents/conflict.txt --all"))
|
||||
assert rendered["data"]["text"] == "first body"
|
||||
|
||||
|
||||
def test_add_rejects_unsupported_type_before_registration(tmp_path):
|
||||
from pageindex.filesystem import PageIndexFileSystem
|
||||
|
||||
source = tmp_path / "payload.json"
|
||||
source.write_text('{"unsupported": true}', encoding="utf-8")
|
||||
filesystem = PageIndexFileSystem(
|
||||
workspace=tmp_path / "workspace",
|
||||
metadata_generator=GeneratedMetadata(),
|
||||
summary_projection_indexer=RecordingSummaryIndexer(),
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="Unsupported file type"):
|
||||
filesystem.add_file(source, "/documents")
|
||||
|
||||
assert filesystem.browse("/", recursive=True)["files"] == []
|
||||
assert not list((tmp_path / "workspace" / "artifacts" / "uploads").glob("**/*"))
|
||||
|
||||
|
||||
def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monkeypatch):
|
||||
from pageindex import PageIndexClient
|
||||
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
|
||||
|
||||
indexed_paths = []
|
||||
|
||||
def fake_index(self, file_path, mode="auto"):
|
||||
indexed_paths.append(Path(file_path))
|
||||
doc_id = "doc_added_md"
|
||||
doc = {
|
||||
"id": doc_id,
|
||||
"type": "md",
|
||||
"path": str(Path(file_path).resolve()),
|
||||
"doc_name": "notes.md",
|
||||
"doc_description": "",
|
||||
"line_count": 3,
|
||||
"structure": [
|
||||
{
|
||||
"title": "Notes",
|
||||
"node_id": "0001",
|
||||
"line_num": 1,
|
||||
"text": "# Notes\n\ncopied markdown body",
|
||||
"nodes": [],
|
||||
}
|
||||
],
|
||||
}
|
||||
write_pageindex_client_doc(self.workspace, doc_id, doc)
|
||||
self.documents[doc_id] = doc
|
||||
return doc_id
|
||||
|
||||
monkeypatch.setattr(PageIndexClient, "index", fake_index)
|
||||
source = tmp_path / "notes.md"
|
||||
source.write_text("# Notes\n\ncopied markdown body", encoding="utf-8")
|
||||
filesystem = PageIndexFileSystem(
|
||||
workspace=tmp_path / "workspace",
|
||||
metadata_generator=GeneratedMetadata(),
|
||||
summary_projection_indexer=RecordingSummaryIndexer(),
|
||||
)
|
||||
|
||||
info = filesystem.add_file(source, "/documents")
|
||||
executor = PIFSCommandExecutor(filesystem, json_output=True)
|
||||
structure = json.loads(executor.execute("cat /documents/notes.md --structure"))
|
||||
|
||||
assert structure["data"]["available"] is True
|
||||
assert structure["data"]["structure"][0]["title"] == "Notes"
|
||||
assert indexed_paths == [Path(info["storage_uri"].removeprefix("file://"))]
|
||||
assert indexed_paths[0].resolve() != source.resolve()
|
||||
|
||||
|
||||
def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monkeypatch):
|
||||
from pageindex.filesystem import PageIndexFileSystem
|
||||
|
||||
source = tmp_path / "atomic.txt"
|
||||
source.write_text("atomic body", encoding="utf-8")
|
||||
filesystem = PageIndexFileSystem(
|
||||
workspace=tmp_path / "workspace",
|
||||
metadata_generator=GeneratedMetadata(),
|
||||
summary_projection_indexer=RecordingSummaryIndexer(),
|
||||
)
|
||||
|
||||
def fail_insert(records):
|
||||
raise RuntimeError("catalog insert failed")
|
||||
|
||||
monkeypatch.setattr(filesystem.store, "insert_files", fail_insert)
|
||||
|
||||
with pytest.raises(RuntimeError, match="catalog insert failed"):
|
||||
filesystem.add_file(source, "/documents")
|
||||
|
||||
assert filesystem.browse("/", recursive=True)["files"] == []
|
||||
assert not list((tmp_path / "workspace" / "artifacts" / "uploads").glob("**/*"))
|
||||
assert not list((tmp_path / "workspace" / "artifacts" / "text").glob("*.txt"))
|
||||
assert not list((tmp_path / "workspace" / "artifacts" / "raw").glob("*.json"))
|
||||
|
||||
|
||||
def test_cli_add_uses_workspace_and_prints_added_file(monkeypatch, capsys, tmp_path):
|
||||
from pageindex.filesystem import cli
|
||||
|
||||
source = tmp_path / "cli.txt"
|
||||
source.write_text("cli body", encoding="utf-8")
|
||||
calls = []
|
||||
|
||||
class FakeAddFileSystem:
|
||||
def __init__(self, workspace):
|
||||
self.workspace = Path(workspace)
|
||||
|
||||
def configure_existing_projection_retrieval(self):
|
||||
return False
|
||||
|
||||
def add_file(self, physical_path, virtual_target):
|
||||
calls.append((self.workspace, physical_path, virtual_target))
|
||||
return {
|
||||
"file_ref": "file_cli",
|
||||
"path": "/documents/cli.txt",
|
||||
"source_path": "documents/cli.txt",
|
||||
"storage_uri": "file:///workspace/artifacts/uploads/file_cli/cli.txt",
|
||||
}
|
||||
|
||||
monkeypatch.setattr(cli, "PageIndexFileSystem", FakeAddFileSystem)
|
||||
|
||||
status = cli.main(["--workspace", str(tmp_path / "workspace"), "add", str(source), "/documents"])
|
||||
|
||||
assert status == 0
|
||||
assert calls == [(tmp_path / "workspace", str(source), "/documents")]
|
||||
assert capsys.readouterr().out == (
|
||||
"added: /documents/cli.txt\n"
|
||||
"file_ref: file_cli\n"
|
||||
"storage_uri: file:///workspace/artifacts/uploads/file_cli/cli.txt\n"
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue