Files
fastapi-sqlalchemy-asyncpg/app/services/scheduler.py
2025-11-14 10:36:34 +01:00

47 lines
1.5 KiB
Python

from datetime import datetime
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
from attrs import define
from rotoger import get_logger
from sqlalchemy import text
from starlette.types import ASGIApp, Receive, Scope, Send
from app.database import AsyncSessionFactory
logger = get_logger()
async def tick():
async with AsyncSessionFactory() as session:
stmt = text("select 1;")
await logger.ainfo(f">>>> Be or not to be...{datetime.now()}")
result = await session.execute(stmt)
await logger.ainfo(f">>>> Result: {result.scalar()}")
return True
@define
class SchedulerMiddleware:
app: ASGIApp
scheduler: AsyncScheduler
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Handles the incoming request and schedules tasks if the scope type is 'lifespan'.
Args:
scope (Scope): The ASGI scope dictionary containing request information.
receive (Receive): The ASGI receive callable.
send (Send): The ASGI send callable.
"""
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(
tick, IntervalTrigger(seconds=25), id="tick-sql-25"
)
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)