move handlers to separate files

This commit is contained in:
Dmitry Afanasyev
2023-09-22 22:59:52 +03:00
committed by GitHub
parent 1ecf95631d
commit 315284fc38
8 changed files with 161 additions and 128 deletions

View File

@@ -4,50 +4,45 @@ from asyncio import Queue, sleep
from dataclasses import dataclass
from functools import cached_property
from http import HTTPStatus
from typing import Any
from fastapi import Request, Response
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
from telegram.ext import Application
from app.core.utils import logger
from settings.config import AppSettings
class BotApplication:
def __init__(self, settings: AppSettings) -> None:
self.application: Application = ( # type: ignore
def __init__(
self,
settings: AppSettings,
handlers: list[Any],
application: Application | None = None, # type: ignore[type-arg]
) -> None:
self.application: Application = application or ( # type: ignore
Application.builder().token(token=settings.TELEGRAM_API_TOKEN).build()
)
self.add_handlers()
self.handlers = handlers
self.settings = settings
self.start_with_webhook = settings.START_WITH_WEBHOOK
self._add_handlers()
async def set_webhook(self) -> None:
await self.application.initialize()
await self.application.bot.set_webhook(url=self.webhook_url)
logger.info('webhook is set')
async def delete_webhook(self) -> None:
await self.application.bot.delete_webhook()
@staticmethod
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
if update.message:
await asyncio.sleep(10)
await update.message.reply_text(
"Help!",
disable_notification=True,
api_kwargs={"text": "Hello World"},
)
return None
def add_handlers(self) -> None:
self.application.add_handler(CommandHandler("help", self.help_command))
logger.info('webhook has been deleted')
async def polling(self) -> None:
await self.application.initialize()
await self.application.start()
await self.application.updater.start_polling() # type: ignore
logger.info("bot started in polling mode")
async def shutdown(self) -> None:
await self.application.updater.shutdown() # type: ignore
@@ -56,6 +51,10 @@ class BotApplication:
def webhook_url(self) -> str:
return os.path.join(self.settings.DOMAIN.strip("/"), self.settings.bot_webhook_url.strip("/"))
def _add_handlers(self) -> None:
for handler in self.handlers:
self.application.add_handler(handler)
@dataclass
class BotQueue:

14
app/core/commands.py Normal file
View File

@@ -0,0 +1,14 @@
from telegram import Update
from telegram.ext import ContextTypes
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
if update.message:
await update.message.reply_text(
"Help!",
disable_notification=True,
api_kwargs={"text": "Hello World"},
)
return None

20
app/core/handlers.py Normal file
View File

@@ -0,0 +1,20 @@
from dataclasses import dataclass, field
from typing import Any
from telegram.ext import CommandHandler
from app.core.commands import help_command
@dataclass
class CommandHandlers:
handlers: list[Any] = field(default_factory=list[Any])
def add_handler(self, handler: Any) -> None:
self.handlers.append(handler)
command_handlers = CommandHandlers()
command_handlers.add_handler(CommandHandler("help", help_command))

37
app/core/utils.py Normal file
View File

@@ -0,0 +1,37 @@
import sys
from datetime import datetime, timedelta
from functools import lru_cache, wraps
from typing import Any
from loguru import logger as loguru_logger
logger = loguru_logger
logger.remove()
logger.add(
sink=sys.stdout,
colorize=True,
level='DEBUG',
format="<cyan>{time:DD.MM.YYYY HH:mm:ss}</cyan> | <level>{level}</level> | <magenta>{message}</magenta>",
)
def timed_cache(**timedelta_kwargs: Any) -> Any:
def _wrapper(func: Any) -> Any:
update_delta = timedelta(**timedelta_kwargs)
next_update = datetime.utcnow() + update_delta
# Apply @lru_cache to f with no cache size limit
cached_func = lru_cache(None)(func)
@wraps(func)
def _wrapped(*args: Any, **kwargs: Any) -> Any:
nonlocal next_update
now = datetime.utcnow()
if now >= next_update:
cached_func.cache_clear()
next_update = now + update_delta
return cached_func(*args, **kwargs)
return _wrapped
return _wrapper

View File

@@ -1,23 +1,14 @@
import asyncio
import sys
from functools import cached_property
from fastapi import FastAPI
from fastapi.responses import UJSONResponse
from loguru import logger
from app.core.bot import BotApplication, BotQueue
from app.core.handlers import command_handlers
from app.routers import api_router
from settings.config import AppSettings, get_settings
logger.remove()
logger.add(
sink=sys.stdout,
colorize=True,
level="DEBUG",
format="<cyan>{time:DD.MM.YYYY HH:mm:ss}</cyan> | <level>{level}</level> | <magenta>{message}</magenta>",
)
class Application:
def __init__(self, settings: AppSettings, bot_app: BotApplication) -> None:
@@ -60,7 +51,7 @@ class Application:
def create_app(settings: AppSettings | None = None) -> FastAPI:
settings = settings or get_settings()
bot_app = BotApplication(settings=settings)
bot_app = BotApplication(settings=settings, handlers=command_handlers.handlers)
return Application(settings=settings, bot_app=bot_app).fastapi_app