from collections.abc import AsyncGenerator from http.client import HTTPException from fastapi.encoders import jsonable_encoder from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from app import config from app.utils import get_logger logger = get_logger(__name__) global_settings = config.get_settings() url = global_settings.asyncpg_url engine = create_async_engine( url, future=True, echo=True, json_serializer=jsonable_encoder, ) # expire_on_commit=False will prevent attributes from being expired # after commit. AsyncSessionFactory = sessionmaker(engine, autoflush=False, expire_on_commit=False, class_=AsyncSession) # Dependency async def get_db() -> AsyncGenerator: async with AsyncSessionFactory() as session: logger.debug(f"ASYNC Pool: {engine.pool.status()}") yield session async def get_async_db() -> AsyncGenerator: try: session: AsyncSession = AsyncSessionFactory() logger.debug(f"ASYNC Pool: {engine.pool.status()}") yield session except SQLAlchemyError as sql_ex: await session.rollback() raise sql_ex except HTTPException as http_ex: await session.rollback() raise http_ex else: await session.commit() finally: await session.close()