mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
96 lines
3.6 KiB
Python
96 lines
3.6 KiB
Python
from typing import Any, Optional
|
|
|
|
from sqlalchemy.future import select
|
|
|
|
from api.db.base_client import BaseDBClient
|
|
from api.db.models import OrganizationConfigurationModel
|
|
|
|
|
|
class OrganizationConfigurationClient(BaseDBClient):
|
|
async def get_configuration(
|
|
self, organization_id: int, key: str
|
|
) -> Optional[OrganizationConfigurationModel]:
|
|
"""Get a specific configuration for an organization by key."""
|
|
async with self.async_session() as session:
|
|
result = await session.execute(
|
|
select(OrganizationConfigurationModel).where(
|
|
OrganizationConfigurationModel.organization_id == organization_id,
|
|
OrganizationConfigurationModel.key == key,
|
|
)
|
|
)
|
|
return result.scalars().first()
|
|
|
|
async def get_all_configurations(
|
|
self, organization_id: int
|
|
) -> list[OrganizationConfigurationModel]:
|
|
"""Get all configurations for an organization."""
|
|
async with self.async_session() as session:
|
|
result = await session.execute(
|
|
select(OrganizationConfigurationModel).where(
|
|
OrganizationConfigurationModel.organization_id == organization_id
|
|
)
|
|
)
|
|
return result.scalars().all()
|
|
|
|
async def upsert_configuration(
|
|
self, organization_id: int, key: str, value: Any
|
|
) -> OrganizationConfigurationModel:
|
|
"""Create or update a configuration for an organization."""
|
|
async with self.async_session() as session:
|
|
# First try to get existing configuration
|
|
result = await session.execute(
|
|
select(OrganizationConfigurationModel).where(
|
|
OrganizationConfigurationModel.organization_id == organization_id,
|
|
OrganizationConfigurationModel.key == key,
|
|
)
|
|
)
|
|
config = result.scalars().first()
|
|
|
|
if config:
|
|
# Update existing configuration
|
|
config.value = value
|
|
else:
|
|
# Create new configuration
|
|
config = OrganizationConfigurationModel(
|
|
organization_id=organization_id,
|
|
key=key,
|
|
value=value,
|
|
)
|
|
session.add(config)
|
|
|
|
try:
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
raise e
|
|
await session.refresh(config)
|
|
return config
|
|
|
|
async def delete_configuration(self, organization_id: int, key: str) -> bool:
|
|
"""Delete a configuration for an organization."""
|
|
async with self.async_session() as session:
|
|
result = await session.execute(
|
|
select(OrganizationConfigurationModel).where(
|
|
OrganizationConfigurationModel.organization_id == organization_id,
|
|
OrganizationConfigurationModel.key == key,
|
|
)
|
|
)
|
|
config = result.scalars().first()
|
|
|
|
if not config:
|
|
return False
|
|
|
|
await session.delete(config)
|
|
try:
|
|
await session.commit()
|
|
except Exception as e:
|
|
await session.rollback()
|
|
raise e
|
|
return True
|
|
|
|
async def get_configuration_value(
|
|
self, organization_id: int, key: str, default: Any = None
|
|
) -> Any:
|
|
"""Get the value of a configuration, returning default if not found."""
|
|
config = await self.get_configuration(organization_id, key)
|
|
return config.value if config else default
|