Jakub Miazek 9b1938c450 refactor
2022-09-12 13:24:15 +02:00

38 lines
990 B
Python

from typing import AsyncGenerator
from fastapi import HTTPException
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from app import config
global_settings = config.get_settings()
url = global_settings.asyncpg_url
engine = create_async_engine(
url,
future=True,
echo=True,
)
# expire_on_commit=False will prevent attributes from being expired
# after commit.
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
# Dependency
async def get_db() -> AsyncGenerator:
async with async_session() as session:
try:
yield session
await session.commit()
except SQLAlchemyError as sql_ex:
await session.rollback()
raise sql_ex
except HTTPException as http_ex:
await session.rollback()
raise http_ex
finally:
await session.close()