mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-28 10:26:33 +02:00
refactor: implement file type classification for supported extensions across Dropbox, Google Drive, and OneDrive connectors, enhancing file handling and error management
This commit is contained in:
parent
47f4be08d9
commit
dc7047f64d
14 changed files with 250 additions and 27 deletions
|
|
@ -1,25 +1,8 @@
|
||||||
"""File type handlers for Dropbox."""
|
"""File type handlers for Dropbox."""
|
||||||
|
|
||||||
PAPER_EXTENSION = ".paper"
|
from app.etl_pipeline.file_classifier import FileCategory, classify_file
|
||||||
|
|
||||||
SKIP_EXTENSIONS: frozenset[str] = frozenset({
|
PAPER_EXTENSION = ".paper"
|
||||||
# Non-universal images (not supported by all 3 ETL pipelines)
|
|
||||||
".svg", ".gif", ".webp", ".heic", ".ico",
|
|
||||||
".raw", ".cr2", ".nef", ".arw", ".dng",
|
|
||||||
".psd", ".ai", ".sketch", ".fig",
|
|
||||||
# Video
|
|
||||||
".mov", ".avi", ".mkv", ".wmv", ".flv",
|
|
||||||
# Binaries / executables
|
|
||||||
".exe", ".dll", ".so", ".dylib", ".bin", ".app", ".dmg", ".iso",
|
|
||||||
# Archives
|
|
||||||
".zip", ".tar", ".gz", ".rar", ".7z", ".bz2",
|
|
||||||
# Fonts
|
|
||||||
".ttf", ".otf", ".woff", ".woff2",
|
|
||||||
# 3D / CAD
|
|
||||||
".stl", ".obj", ".fbx", ".blend",
|
|
||||||
# Database
|
|
||||||
".db", ".sqlite", ".mdb",
|
|
||||||
})
|
|
||||||
|
|
||||||
MIME_TO_EXTENSION: dict[str, str] = {
|
MIME_TO_EXTENSION: dict[str, str] = {
|
||||||
"application/pdf": ".pdf",
|
"application/pdf": ".pdf",
|
||||||
|
|
@ -71,5 +54,4 @@ def should_skip_file(item: dict) -> bool:
|
||||||
if not item.get("is_downloadable", True):
|
if not item.get("is_downloadable", True):
|
||||||
return True
|
return True
|
||||||
name = item.get("name", "")
|
name = item.get("name", "")
|
||||||
ext = get_extension_from_name(name).lower()
|
return classify_file(name) == FileCategory.UNSUPPORTED
|
||||||
return ext in SKIP_EXTENSIONS
|
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ from .file_types import (
|
||||||
get_export_mime_type,
|
get_export_mime_type,
|
||||||
get_extension_from_mime,
|
get_extension_from_mime,
|
||||||
is_google_workspace_file,
|
is_google_workspace_file,
|
||||||
|
should_skip_by_extension,
|
||||||
should_skip_file,
|
should_skip_file,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -42,6 +43,9 @@ async def download_and_extract_content(
|
||||||
if should_skip_file(mime_type):
|
if should_skip_file(mime_type):
|
||||||
return None, {}, f"Skipping {mime_type}"
|
return None, {}, f"Skipping {mime_type}"
|
||||||
|
|
||||||
|
if should_skip_by_extension(file_name):
|
||||||
|
return None, {}, f"Skipping unsupported extension: {file_name}"
|
||||||
|
|
||||||
logger.info(f"Downloading file for content extraction: {file_name} ({mime_type})")
|
logger.info(f"Downloading file for content extraction: {file_name} ({mime_type})")
|
||||||
|
|
||||||
drive_metadata: dict[str, Any] = {
|
drive_metadata: dict[str, Any] = {
|
||||||
|
|
@ -148,10 +152,12 @@ async def download_and_process_file(
|
||||||
file_name = file.get("name", "Unknown")
|
file_name = file.get("name", "Unknown")
|
||||||
mime_type = file.get("mimeType", "")
|
mime_type = file.get("mimeType", "")
|
||||||
|
|
||||||
# Skip folders and shortcuts
|
|
||||||
if should_skip_file(mime_type):
|
if should_skip_file(mime_type):
|
||||||
return None, f"Skipping {mime_type}", None
|
return None, f"Skipping {mime_type}", None
|
||||||
|
|
||||||
|
if should_skip_by_extension(file_name):
|
||||||
|
return None, f"Skipping unsupported extension: {file_name}", None
|
||||||
|
|
||||||
logger.info(f"Downloading file: {file_name} ({mime_type})")
|
logger.info(f"Downloading file: {file_name} ({mime_type})")
|
||||||
|
|
||||||
temp_file_path = None
|
temp_file_path = None
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,7 @@
|
||||||
"""File type handlers for Google Drive."""
|
"""File type handlers for Google Drive."""
|
||||||
|
|
||||||
|
from app.etl_pipeline.file_classifier import FileCategory, classify_file
|
||||||
|
|
||||||
GOOGLE_DOC = "application/vnd.google-apps.document"
|
GOOGLE_DOC = "application/vnd.google-apps.document"
|
||||||
GOOGLE_SHEET = "application/vnd.google-apps.spreadsheet"
|
GOOGLE_SHEET = "application/vnd.google-apps.spreadsheet"
|
||||||
GOOGLE_SLIDE = "application/vnd.google-apps.presentation"
|
GOOGLE_SLIDE = "application/vnd.google-apps.presentation"
|
||||||
|
|
@ -46,6 +48,11 @@ def should_skip_file(mime_type: str) -> bool:
|
||||||
return mime_type in [GOOGLE_FOLDER, GOOGLE_SHORTCUT]
|
return mime_type in [GOOGLE_FOLDER, GOOGLE_SHORTCUT]
|
||||||
|
|
||||||
|
|
||||||
|
def should_skip_by_extension(filename: str) -> bool:
|
||||||
|
"""Return True if the file extension is not parseable by any ETL pipeline."""
|
||||||
|
return classify_file(filename) == FileCategory.UNSUPPORTED
|
||||||
|
|
||||||
|
|
||||||
def get_export_mime_type(mime_type: str) -> str | None:
|
def get_export_mime_type(mime_type: str) -> str | None:
|
||||||
"""Get export MIME type for Google Workspace files."""
|
"""Get export MIME type for Google Workspace files."""
|
||||||
return EXPORT_FORMATS.get(mime_type)
|
return EXPORT_FORMATS.get(mime_type)
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,7 @@
|
||||||
"""File type handlers for Microsoft OneDrive."""
|
"""File type handlers for Microsoft OneDrive."""
|
||||||
|
|
||||||
|
from app.etl_pipeline.file_classifier import FileCategory, classify_file
|
||||||
|
|
||||||
ONEDRIVE_FOLDER_FACET = "folder"
|
ONEDRIVE_FOLDER_FACET = "folder"
|
||||||
ONENOTE_MIME = "application/msonenote"
|
ONENOTE_MIME = "application/msonenote"
|
||||||
|
|
||||||
|
|
@ -39,7 +41,7 @@ def is_folder(item: dict) -> bool:
|
||||||
|
|
||||||
|
|
||||||
def should_skip_file(item: dict) -> bool:
|
def should_skip_file(item: dict) -> bool:
|
||||||
"""Skip folders, OneNote files, remote items (shared links), and packages."""
|
"""Skip folders, OneNote files, remote items (shared links), packages, and unsupported extensions."""
|
||||||
if is_folder(item):
|
if is_folder(item):
|
||||||
return True
|
return True
|
||||||
if "remoteItem" in item:
|
if "remoteItem" in item:
|
||||||
|
|
@ -47,4 +49,7 @@ def should_skip_file(item: dict) -> bool:
|
||||||
if "package" in item:
|
if "package" in item:
|
||||||
return True
|
return True
|
||||||
mime = item.get("file", {}).get("mimeType", "")
|
mime = item.get("file", {}).get("mimeType", "")
|
||||||
return mime in SKIP_MIME_TYPES
|
if mime in SKIP_MIME_TYPES:
|
||||||
|
return True
|
||||||
|
name = item.get("name", "")
|
||||||
|
return classify_file(name) == FileCategory.UNSUPPORTED
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
from app.config import config as app_config
|
from app.config import config as app_config
|
||||||
from app.etl_pipeline.etl_document import EtlRequest, EtlResult
|
from app.etl_pipeline.etl_document import EtlRequest, EtlResult
|
||||||
from app.etl_pipeline.exceptions import EtlServiceUnavailableError
|
from app.etl_pipeline.exceptions import EtlServiceUnavailableError, EtlUnsupportedFileError
|
||||||
from app.etl_pipeline.file_classifier import FileCategory, classify_file
|
from app.etl_pipeline.file_classifier import FileCategory, classify_file
|
||||||
from app.etl_pipeline.parsers.audio import transcribe_audio
|
from app.etl_pipeline.parsers.audio import transcribe_audio
|
||||||
from app.etl_pipeline.parsers.direct_convert import convert_file_directly
|
from app.etl_pipeline.parsers.direct_convert import convert_file_directly
|
||||||
|
|
@ -13,6 +13,11 @@ class EtlPipelineService:
|
||||||
async def extract(self, request: EtlRequest) -> EtlResult:
|
async def extract(self, request: EtlRequest) -> EtlResult:
|
||||||
category = classify_file(request.filename)
|
category = classify_file(request.filename)
|
||||||
|
|
||||||
|
if category == FileCategory.UNSUPPORTED:
|
||||||
|
raise EtlUnsupportedFileError(
|
||||||
|
f"File type not supported for parsing: {request.filename}"
|
||||||
|
)
|
||||||
|
|
||||||
if category == FileCategory.PLAINTEXT:
|
if category == FileCategory.PLAINTEXT:
|
||||||
content = read_plaintext(request.file_path)
|
content = read_plaintext(request.file_path)
|
||||||
return EtlResult(
|
return EtlResult(
|
||||||
|
|
|
||||||
|
|
@ -4,3 +4,7 @@ class EtlParseError(Exception):
|
||||||
|
|
||||||
class EtlServiceUnavailableError(Exception):
|
class EtlServiceUnavailableError(Exception):
|
||||||
"""Raised when the configured ETL_SERVICE is not recognised."""
|
"""Raised when the configured ETL_SERVICE is not recognised."""
|
||||||
|
|
||||||
|
|
||||||
|
class EtlUnsupportedFileError(Exception):
|
||||||
|
"""Raised when a file type cannot be parsed by any ETL pipeline."""
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pathlib import PurePosixPath
|
from pathlib import PurePosixPath
|
||||||
|
|
||||||
|
from app.utils.file_extensions import DOCUMENT_EXTENSIONS
|
||||||
|
|
||||||
PLAINTEXT_EXTENSIONS = frozenset(
|
PLAINTEXT_EXTENSIONS = frozenset(
|
||||||
{
|
{
|
||||||
|
|
@ -35,6 +36,7 @@ class FileCategory(Enum):
|
||||||
PLAINTEXT = "plaintext"
|
PLAINTEXT = "plaintext"
|
||||||
AUDIO = "audio"
|
AUDIO = "audio"
|
||||||
DIRECT_CONVERT = "direct_convert"
|
DIRECT_CONVERT = "direct_convert"
|
||||||
|
UNSUPPORTED = "unsupported"
|
||||||
DOCUMENT = "document"
|
DOCUMENT = "document"
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -46,4 +48,6 @@ def classify_file(filename: str) -> FileCategory:
|
||||||
return FileCategory.AUDIO
|
return FileCategory.AUDIO
|
||||||
if suffix in DIRECT_CONVERT_EXTENSIONS:
|
if suffix in DIRECT_CONVERT_EXTENSIONS:
|
||||||
return FileCategory.DIRECT_CONVERT
|
return FileCategory.DIRECT_CONVERT
|
||||||
|
if suffix in DOCUMENT_EXTENSIONS:
|
||||||
return FileCategory.DOCUMENT
|
return FileCategory.DOCUMENT
|
||||||
|
return FileCategory.UNSUPPORTED
|
||||||
|
|
|
||||||
|
|
@ -356,6 +356,7 @@ async def _extract_file_content(
|
||||||
FileCategory.PLAINTEXT: "Reading file",
|
FileCategory.PLAINTEXT: "Reading file",
|
||||||
FileCategory.DIRECT_CONVERT: "Converting file",
|
FileCategory.DIRECT_CONVERT: "Converting file",
|
||||||
FileCategory.AUDIO: "Transcribing audio",
|
FileCategory.AUDIO: "Transcribing audio",
|
||||||
|
FileCategory.UNSUPPORTED: "Unsupported file type",
|
||||||
FileCategory.DOCUMENT: "Extracting content",
|
FileCategory.DOCUMENT: "Extracting content",
|
||||||
}
|
}
|
||||||
await NotificationService.document_processing.notify_processing_progress(
|
await NotificationService.document_processing.notify_processing_progress(
|
||||||
|
|
|
||||||
31
surfsense_backend/app/utils/file_extensions.py
Normal file
31
surfsense_backend/app/utils/file_extensions.py
Normal file
|
|
@ -0,0 +1,31 @@
|
||||||
|
"""Allowlist of document extensions the ETL parsers can handle.
|
||||||
|
|
||||||
|
Every consumer (file_classifier, connector-level skip checks) imports from
|
||||||
|
here so there is a single source of truth. Extensions already covered by
|
||||||
|
PLAINTEXT_EXTENSIONS, AUDIO_EXTENSIONS, or DIRECT_CONVERT_EXTENSIONS in
|
||||||
|
file_classifier are NOT repeated here -- this set is exclusively for the
|
||||||
|
"document" ETL path (Docling / LlamaParse / Unstructured).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from pathlib import PurePosixPath
|
||||||
|
|
||||||
|
DOCUMENT_EXTENSIONS: frozenset[str] = frozenset({
|
||||||
|
# PDF
|
||||||
|
".pdf",
|
||||||
|
# Microsoft Office
|
||||||
|
".docx", ".doc", ".xlsx", ".xls", ".pptx", ".ppt",
|
||||||
|
# Images (raster -- OCR / vision parsing)
|
||||||
|
".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".tif",
|
||||||
|
# Rich text / e-book
|
||||||
|
".rtf", ".epub",
|
||||||
|
# OpenDocument
|
||||||
|
".odt", ".ods", ".odp",
|
||||||
|
# Other (LlamaParse / Unstructured specific)
|
||||||
|
".hwpx",
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def is_supported_document_extension(filename: str) -> bool:
|
||||||
|
"""Return True if the file's extension is in the supported document set."""
|
||||||
|
suffix = PurePosixPath(filename).suffix.lower()
|
||||||
|
return suffix in DOCUMENT_EXTENSIONS
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
"""Tests for Google Drive file type filtering."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.connectors.google_drive.file_types import should_skip_by_extension
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.unit
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename", [
|
||||||
|
"malware.exe", "archive.zip", "video.mov", "font.woff2", "model.blend",
|
||||||
|
])
|
||||||
|
def test_unsupported_extensions_are_skipped(filename):
|
||||||
|
assert should_skip_by_extension(filename) is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename", [
|
||||||
|
"report.pdf", "doc.docx", "sheet.xlsx", "slides.pptx",
|
||||||
|
"readme.txt", "data.csv", "photo.png", "notes.md",
|
||||||
|
])
|
||||||
|
def test_parseable_extensions_are_not_skipped(filename):
|
||||||
|
assert should_skip_by_extension(filename) is False
|
||||||
|
|
@ -0,0 +1,44 @@
|
||||||
|
"""Tests for OneDrive file type filtering."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.connectors.onedrive.file_types import should_skip_file
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.unit
|
||||||
|
|
||||||
|
|
||||||
|
def test_folder_is_skipped():
|
||||||
|
item = {"folder": {}, "name": "My Folder"}
|
||||||
|
assert should_skip_file(item) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_remote_item_is_skipped():
|
||||||
|
item = {"remoteItem": {}, "name": "shared.docx"}
|
||||||
|
assert should_skip_file(item) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_package_is_skipped():
|
||||||
|
item = {"package": {}, "name": "notebook"}
|
||||||
|
assert should_skip_file(item) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_onenote_is_skipped():
|
||||||
|
item = {"name": "notes", "file": {"mimeType": "application/msonenote"}}
|
||||||
|
assert should_skip_file(item) is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename", [
|
||||||
|
"malware.exe", "archive.zip", "video.mov", "font.woff2", "model.blend",
|
||||||
|
])
|
||||||
|
def test_unsupported_extensions_are_skipped(filename):
|
||||||
|
item = {"name": filename, "file": {"mimeType": "application/octet-stream"}}
|
||||||
|
assert should_skip_file(item) is True, f"{filename} should be skipped"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename", [
|
||||||
|
"report.pdf", "doc.docx", "sheet.xlsx", "slides.pptx",
|
||||||
|
"readme.txt", "data.csv", "photo.png", "notes.md",
|
||||||
|
])
|
||||||
|
def test_parseable_files_are_not_skipped(filename):
|
||||||
|
item = {"name": filename, "file": {"mimeType": "application/octet-stream"}}
|
||||||
|
assert should_skip_file(item) is False, f"{filename} should NOT be skipped"
|
||||||
|
|
@ -257,7 +257,7 @@ async def test_extract_pdf_with_llamacloud(tmp_path, mocker):
|
||||||
|
|
||||||
|
|
||||||
async def test_unknown_extension_uses_document_etl(tmp_path, mocker):
|
async def test_unknown_extension_uses_document_etl(tmp_path, mocker):
|
||||||
"""An unknown extension (e.g. .docx) falls through to the document ETL path."""
|
"""An allowlisted document extension (.docx) routes to the document ETL path."""
|
||||||
docx_file = tmp_path / "doc.docx"
|
docx_file = tmp_path / "doc.docx"
|
||||||
docx_file.write_bytes(b"PK fake docx")
|
docx_file.write_bytes(b"PK fake docx")
|
||||||
|
|
||||||
|
|
@ -307,3 +307,73 @@ async def test_unknown_etl_service_raises(tmp_path, mocker):
|
||||||
await EtlPipelineService().extract(
|
await EtlPipelineService().extract(
|
||||||
EtlRequest(file_path=str(pdf_file), filename="report.pdf")
|
EtlRequest(file_path=str(pdf_file), filename="report.pdf")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Slice 13 – unsupported file types are rejected before reaching any parser
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_unknown_extension_classified_as_unsupported():
|
||||||
|
"""An unknown extension defaults to UNSUPPORTED (allowlist behaviour)."""
|
||||||
|
from app.etl_pipeline.file_classifier import FileCategory, classify_file
|
||||||
|
|
||||||
|
assert classify_file("random.xyz") == FileCategory.UNSUPPORTED
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename", [
|
||||||
|
"malware.exe", "archive.zip", "video.mov", "font.woff2",
|
||||||
|
"model.blend", "data.parquet", "package.deb", "firmware.bin",
|
||||||
|
])
|
||||||
|
def test_unsupported_extensions_classified_correctly(filename):
|
||||||
|
"""Extensions not in any allowlist are classified as UNSUPPORTED."""
|
||||||
|
from app.etl_pipeline.file_classifier import FileCategory, classify_file
|
||||||
|
|
||||||
|
assert classify_file(filename) == FileCategory.UNSUPPORTED
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename,expected", [
|
||||||
|
("report.pdf", "document"),
|
||||||
|
("doc.docx", "document"),
|
||||||
|
("slides.pptx", "document"),
|
||||||
|
("sheet.xlsx", "document"),
|
||||||
|
("photo.png", "document"),
|
||||||
|
("photo.jpg", "document"),
|
||||||
|
("book.epub", "document"),
|
||||||
|
("letter.odt", "document"),
|
||||||
|
("readme.md", "plaintext"),
|
||||||
|
("data.csv", "direct_convert"),
|
||||||
|
])
|
||||||
|
def test_parseable_extensions_classified_correctly(filename, expected):
|
||||||
|
"""Parseable files are classified into their correct category."""
|
||||||
|
from app.etl_pipeline.file_classifier import FileCategory, classify_file
|
||||||
|
|
||||||
|
result = classify_file(filename)
|
||||||
|
assert result != FileCategory.UNSUPPORTED
|
||||||
|
assert result.value == expected
|
||||||
|
|
||||||
|
|
||||||
|
async def test_extract_unsupported_file_raises_error(tmp_path):
|
||||||
|
"""EtlPipelineService.extract() raises EtlUnsupportedFileError for .exe files."""
|
||||||
|
from app.etl_pipeline.exceptions import EtlUnsupportedFileError
|
||||||
|
|
||||||
|
exe_file = tmp_path / "program.exe"
|
||||||
|
exe_file.write_bytes(b"\x00" * 10)
|
||||||
|
|
||||||
|
with pytest.raises(EtlUnsupportedFileError, match="not supported"):
|
||||||
|
await EtlPipelineService().extract(
|
||||||
|
EtlRequest(file_path=str(exe_file), filename="program.exe")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_extract_zip_raises_unsupported_error(tmp_path):
|
||||||
|
"""EtlPipelineService.extract() raises EtlUnsupportedFileError for .zip archives."""
|
||||||
|
from app.etl_pipeline.exceptions import EtlUnsupportedFileError
|
||||||
|
|
||||||
|
zip_file = tmp_path / "archive.zip"
|
||||||
|
zip_file.write_bytes(b"PK\x03\x04")
|
||||||
|
|
||||||
|
with pytest.raises(EtlUnsupportedFileError, match="not supported"):
|
||||||
|
await EtlPipelineService().extract(
|
||||||
|
EtlRequest(file_path=str(zip_file), filename="archive.zip")
|
||||||
|
)
|
||||||
|
|
|
||||||
0
surfsense_backend/tests/unit/utils/__init__.py
Normal file
0
surfsense_backend/tests/unit/utils/__init__.py
Normal file
42
surfsense_backend/tests/unit/utils/test_file_extensions.py
Normal file
42
surfsense_backend/tests/unit/utils/test_file_extensions.py
Normal file
|
|
@ -0,0 +1,42 @@
|
||||||
|
"""Tests for the DOCUMENT_EXTENSIONS allowlist module."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.unit
|
||||||
|
|
||||||
|
|
||||||
|
def test_pdf_is_supported_document():
|
||||||
|
from app.utils.file_extensions import is_supported_document_extension
|
||||||
|
|
||||||
|
assert is_supported_document_extension("report.pdf") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_exe_is_not_supported_document():
|
||||||
|
from app.utils.file_extensions import is_supported_document_extension
|
||||||
|
|
||||||
|
assert is_supported_document_extension("malware.exe") is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename", [
|
||||||
|
"report.pdf", "doc.docx", "old.doc",
|
||||||
|
"sheet.xlsx", "legacy.xls",
|
||||||
|
"slides.pptx", "deck.ppt",
|
||||||
|
"photo.png", "photo.jpg", "photo.jpeg", "scan.bmp", "scan.tiff", "scan.tif",
|
||||||
|
"manual.rtf", "book.epub",
|
||||||
|
"letter.odt", "data.ods", "presentation.odp",
|
||||||
|
"korean.hwpx",
|
||||||
|
])
|
||||||
|
def test_document_extensions_are_supported(filename):
|
||||||
|
from app.utils.file_extensions import is_supported_document_extension
|
||||||
|
|
||||||
|
assert is_supported_document_extension(filename) is True, f"{filename} should be supported"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename", [
|
||||||
|
"malware.exe", "archive.zip", "video.mov", "font.woff2",
|
||||||
|
"model.blend", "random.xyz", "data.parquet", "package.deb",
|
||||||
|
])
|
||||||
|
def test_non_document_extensions_are_not_supported(filename):
|
||||||
|
from app.utils.file_extensions import is_supported_document_extension
|
||||||
|
|
||||||
|
assert is_supported_document_extension(filename) is False, f"{filename} should NOT be supported"
|
||||||
Loading…
Add table
Add a link
Reference in a new issue