Jakub Miazek ee1d241d23 format
2023-04-10 21:42:13 +02:00

81 lines
2.2 KiB
Python

from typing import Any
from asyncpg import UniqueViolationError
from fastapi import HTTPException, status
from sqlalchemy.exc import SQLAlchemyError, IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.declarative import as_declarative, declared_attr
@as_declarative()
class BaseReadOnly:
id: Any
__name__: str
# Generate __tablename__ automatically
@declared_attr
def __tablename__(self) -> str:
return self.__name__.lower()
@as_declarative()
class Base:
id: Any
__name__: str
# Generate __tablename__ automatically
@declared_attr
def __tablename__(self) -> str:
return self.__name__.lower()
async def save(self, db_session: AsyncSession):
"""
:param db_session:
:return:
"""
try:
db_session.add(self)
return await db_session.commit()
except SQLAlchemyError as ex:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=repr(ex)) from ex
async def delete(self, db_session: AsyncSession):
"""
:param db_session:
:return:
"""
try:
await db_session.delete(self)
await db_session.commit()
return True
except SQLAlchemyError as ex:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=repr(ex)) from ex
async def update(self, db_session: AsyncSession, **kwargs):
"""
:param db_session:
:param kwargs:
:return:
"""
for k, v in kwargs.items():
setattr(self, k, v)
await self.save(db_session)
async def save_or_update(self, db: AsyncSession):
try:
db.add(self)
return await db.commit()
except IntegrityError as exception:
if isinstance(exception.orig, UniqueViolationError):
return await db.merge(self)
else:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=repr(exception),
) from exception
finally:
await db.close()