2025-07-26 19:25:46 +02:00

47 lines
1.5 KiB
Python

from datetime import datetime
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger
from attrs import define
from sqlalchemy import text
from starlette.types import ASGIApp, Receive, Scope, Send
from app.database import AsyncSessionFactory
from app.utils.logging import AppLogger
logger = AppLogger().get_logger()
async def tick():
async with AsyncSessionFactory() as session:
stmt = text("select 1;")
await logger.ainfo(f">>>> Be or not to be...{datetime.now()}")
result = await session.execute(stmt)
await logger.ainfo(f">>>> Result: {result.scalar()}")
return True
@define
class SchedulerMiddleware:
app: ASGIApp
scheduler: AsyncScheduler
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Handles the incoming request and schedules tasks if the scope type is 'lifespan'.
Args:
scope (Scope): The ASGI scope dictionary containing request information.
receive (Receive): The ASGI receive callable.
send (Send): The ASGI send callable.
"""
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(
tick, IntervalTrigger(seconds=25), id="tick-sql-25"
)
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)