mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-06 20:15:17 +02:00
feat: add pure notification metadata transitions
This commit is contained in:
parent
c23bdc4a5e
commit
d53707ebbf
1 changed files with 33 additions and 0 deletions
33
surfsense_backend/app/notifications/service/metadata.py
Normal file
33
surfsense_backend/app/notifications/service/metadata.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
"""Pure metadata transitions for the notification lifecycle."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
|
||||
def start_metadata(
|
||||
operation_id: str, initial_metadata: dict[str, Any] | None = None
|
||||
) -> dict[str, Any]:
|
||||
"""Seed metadata for a freshly opened, in-progress notification."""
|
||||
metadata = dict(initial_metadata or {})
|
||||
metadata["operation_id"] = operation_id
|
||||
metadata["status"] = "in_progress"
|
||||
metadata["started_at"] = datetime.now(UTC).isoformat()
|
||||
return metadata
|
||||
|
||||
|
||||
def apply_update(
|
||||
current: dict[str, Any],
|
||||
status: str | None = None,
|
||||
metadata_updates: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Return metadata with the status/timestamp stamped and updates merged in."""
|
||||
metadata = dict(current)
|
||||
if status is not None:
|
||||
metadata["status"] = status
|
||||
if status in ("completed", "failed"):
|
||||
metadata["completed_at"] = datetime.now(UTC).isoformat()
|
||||
if metadata_updates:
|
||||
metadata = {**metadata, **metadata_updates}
|
||||
return metadata
|
||||
Loading…
Add table
Add a link
Reference in a new issue