From 23049552121a4e95f68d3fda92c46de2ec62daab Mon Sep 17 00:00:00 2001 From: Dmitry Afanasyev Date: Sun, 28 Aug 2022 04:07:53 +0300 Subject: [PATCH] reworked application and scheduler to classes --- app/core/application.py | 65 +++++++++++++++++++++++++++ app/core/routes.py | 34 +++++++++++++++ app/core/scheduler.py | 65 ++++++++++++++++++++++----- app/main.py | 97 ++++------------------------------------- app/settings.py | 2 +- 5 files changed, 161 insertions(+), 102 deletions(-) create mode 100644 app/core/application.py create mode 100644 app/core/routes.py diff --git a/app/core/application.py b/app/core/application.py new file mode 100644 index 0000000..8d8f190 --- /dev/null +++ b/app/core/application.py @@ -0,0 +1,65 @@ +import asyncio +from dataclasses import dataclass + +from aiogram import Dispatcher +from aiogram.utils.executor import start_polling +from aiohttp import web +from app.core.bot import bot, dispatcher +from app.core.routes import Handler +from app.core.scheduler import BotScheduler, bot_scheduler +from app.core.utils import logger +from app.settings import TELEGRAM_API_TOKEN, WEBHOOK_PATH, WEBHOOK_URL + + +@dataclass +class Application: + handler: Handler = Handler() + scheduler: BotScheduler = bot_scheduler + + async def _on_startup(self, dp: Dispatcher) -> None: + logger.info("Start bot with webhook") + await bot.set_webhook(WEBHOOK_URL) + loop = asyncio.get_running_loop() + loop.create_task(self.handler.get_updates_from_queue()) + logger.info( + f'Webhook set to {WEBHOOK_URL}'.replace( + TELEGRAM_API_TOKEN, '{TELEGRAM_API_TOKEN}' + ) + ) + bot_scheduler.start() + + async def _on_shutdown(self, dp: Dispatcher) -> None: + logger.warning('Shutting down..') + + # Remove webhook (not acceptable in some cases) + await bot.delete_webhook() + + session = await bot.get_session() + if session and not session.closed: + await session.close() + await asyncio.sleep(0.2) + + logger.warning('Bye!') + + @staticmethod + def bot_polling() -> None: + logger.info("Start bot in polling mode") + start_polling( + dispatcher=dispatcher, + skip_updates=True, + ) + + def create_app(self) -> web.Application: + app = web.Application() + app.add_routes( + [ + web.get(f'{WEBHOOK_PATH}/', self.handler.health_check), + web.post( + f'{WEBHOOK_PATH}/{TELEGRAM_API_TOKEN}', + self.handler.put_updates_on_queue, + ), + ] + ) + app.on_startup.append(self._on_startup) + app.on_shutdown.append(self._on_shutdown) + return app diff --git a/app/core/routes.py b/app/core/routes.py new file mode 100644 index 0000000..cfec7b3 --- /dev/null +++ b/app/core/routes.py @@ -0,0 +1,34 @@ +import asyncio +from http import HTTPStatus + +from aiogram.types import Update +from aiohttp import web +from app.core.bot import dispatcher + + +class Handler: + def __init__(self) -> None: + self.queue: asyncio.Queue = asyncio.Queue() # type: ignore + + @staticmethod + async def health_check(request: web.Request) -> web.Response: + return web.Response(text='Health OK', status=HTTPStatus.OK) + + async def put_updates_on_queue(self, request: web.Request) -> web.Response: + """ + Listen {WEBHOOK_PATH} and proxy post request to bot + + :param request: + :return: + """ + data = await request.json() + tg_update = Update(**data) + self.queue.put_nowait(tg_update) + + return web.Response(status=HTTPStatus.ACCEPTED) + + async def get_updates_from_queue(self) -> None: + while True: + update = await self.queue.get() + await dispatcher.process_update(update) + await asyncio.sleep(0.1) diff --git a/app/core/scheduler.py b/app/core/scheduler.py index 12b4c51..e6d7b61 100644 --- a/app/core/scheduler.py +++ b/app/core/scheduler.py @@ -1,11 +1,34 @@ +from typing import Any + from app.core.bot import morning_bus_mailing +from app.core.utils import logger +from apscheduler.schedulers.asyncio import AsyncIOScheduler -cron_jobs = [ - {'trigger': 'cron', 'day_of_week': 'mon-fri', 'hour': 8, 'minute': 59, 'second': 0}, - {'trigger': 'cron', 'day_of_week': 'mon-fri', 'hour': 9, 'minute': 4, 'second': 0}, - {'trigger': 'cron', 'day_of_week': 'mon-fri', 'hour': 9, 'minute': 9, 'second': 0}, -] - +bot_cron_jobs = { + 'morning_home->work_bus': [ + { + 'trigger': 'cron', + 'day_of_week': 'mon-fri', + 'hour': 8, + 'minute': 59, + 'second': 0, + }, + { + 'trigger': 'cron', + 'day_of_week': 'mon-fri', + 'hour': 9, + 'minute': 4, + 'second': 0, + }, + { + 'trigger': 'cron', + 'day_of_week': 'mon-fri', + 'hour': 9, + 'minute': 9, + 'second': 0, + }, + ] +} user_chat_ids = { 'chat_ids': [ 417070387, # me @@ -14,10 +37,28 @@ user_chat_ids = { } -def asyncio_schedule() -> None: - from apscheduler.schedulers.asyncio import AsyncIOScheduler +class BotScheduler: + def __init__( + self, + cron_jobs: dict[str, list[dict[str, Any]]], + chat_ids: dict[str, list[int]] | None = None, + ): + self.cron_jobs = cron_jobs + self.chat_ids = chat_ids + self.scheduler = AsyncIOScheduler() - scheduler = AsyncIOScheduler() - for cron in cron_jobs: - scheduler.add_job(morning_bus_mailing, kwargs=user_chat_ids, **cron) - scheduler.start() + def add_scheduler_jobs(self, jobs_name: str) -> None: + cron_jobs = self.cron_jobs.get(jobs_name) + if not cron_jobs: + return None + for cron in cron_jobs: + self.scheduler.add_job(morning_bus_mailing, kwargs=user_chat_ids, **cron) + logger.info(f'Added scheduled job {cron}') + + def start(self) -> None: + self.scheduler.start() + logger.info('Scheduler started') + + +bot_scheduler = BotScheduler(cron_jobs=bot_cron_jobs, chat_ids=user_chat_ids) +bot_scheduler.add_scheduler_jobs(jobs_name='morning_home->work_bus') diff --git a/app/main.py b/app/main.py index 2b259c8..dc95515 100644 --- a/app/main.py +++ b/app/main.py @@ -1,106 +1,25 @@ -import asyncio import sys -from http import HTTPStatus from pathlib import Path -from aiogram import Dispatcher -from aiogram.types import Update -from aiogram.utils.executor import start_polling from aiohttp import web sys.path.append(str(Path(__file__).parent.parent)) -from app.core.bot import bot, dispatcher -from app.core.scheduler import asyncio_schedule -from app.core.utils import logger -from app.settings import ( - START_WITH_WEBHOOK, - TELEGRAM_API_TOKEN, - WEBAPP_HOST, - WEBAPP_PORT, - WEBHOOK_PATH, - WEBHOOK_URL, -) - -queue: asyncio.Queue = asyncio.Queue() # type: ignore - - -async def on_startup(dp: Dispatcher) -> None: - logger.info("Start bot with webhook") - await bot.set_webhook(WEBHOOK_URL) - loop = asyncio.get_running_loop() - loop.create_task(get_updates_from_queue()) - logger.info( - f'Webhook set to {WEBHOOK_URL}'.replace( - TELEGRAM_API_TOKEN, '{TELEGRAM_API_TOKEN}' - ) - ) - asyncio_schedule() - - -async def on_shutdown(dp: Dispatcher) -> None: - logger.warning('Shutting down..') - - # Remove webhook (not acceptable in some cases) - await bot.delete_webhook() - - session = await bot.get_session() - if session and not session.closed: - await session.close() - await asyncio.sleep(0.2) - - logger.warning('Bye!') - - -def bot_polling() -> None: - logger.info("Start bot in polling mode") - start_polling( - dispatcher=dispatcher, - skip_updates=True, - ) - - -async def put_updates_on_queue(request: web.Request) -> web.Response: - """ - Listen {WEBHOOK_PATH} and proxy post request to bot - - :param request: - :return: - """ - data = await request.json() - tg_update = Update(**data) - queue.put_nowait(tg_update) - - return web.Response(status=HTTPStatus.ACCEPTED) - - -async def health_check(request: web.Request) -> web.Response: - return web.Response(text='Health OK', status=HTTPStatus.OK) - - -async def get_updates_from_queue() -> None: - - while True: - update = await queue.get() - await dispatcher.process_update(update) - await asyncio.sleep(0.1) +from app.settings import START_WITH_WEBHOOK, WEBAPP_HOST, WEBAPP_PORT +from core.application import Application async def create_app() -> web.Application: - application = web.Application() - application.router.add_get(f'{WEBHOOK_PATH}/', health_check) - application.router.add_post( - f'{WEBHOOK_PATH}/{TELEGRAM_API_TOKEN}', put_updates_on_queue - ) - application.on_startup.append(on_startup) - application.on_shutdown.append(on_shutdown) - return application + application = Application() + return application.create_app() if __name__ == '__main__': + application = Application() + app = application.create_app() + if START_WITH_WEBHOOK: - app = create_app() web.run_app(app=app, host=WEBAPP_HOST, port=WEBAPP_PORT) else: - bot_polling() + application.bot_polling() diff --git a/app/settings.py b/app/settings.py index ac17604..b839a05 100644 --- a/app/settings.py +++ b/app/settings.py @@ -30,4 +30,4 @@ WEBAPP_PORT = config('WEBAPP_PORT', cast=int, default=8084) START_WITH_WEBHOOK = config('START_WITH_WEBHOOK', cast=bool, default=False) -DRIVER_SESSION_TTL = 28 # driver cache ttl session in seconds +DRIVER_SESSION_TTL = 28 # selenium driver session cache ttl in seconds