queue not working

This commit is contained in:
2023-05-10 04:21:36 +03:00
parent 4d065aa93f
commit 8b2fb805b5
9 changed files with 414 additions and 123 deletions

View File

@@ -1,7 +1,9 @@
from fastapi import APIRouter
from fastapi import APIRouter, Request
from fastapi.responses import ORJSONResponse
from starlette import status
from settings.config import get_settings
router = APIRouter()
@@ -13,3 +15,16 @@ router = APIRouter()
)
async def healthcheck() -> ORJSONResponse:
return ORJSONResponse(content=None, status_code=status.HTTP_200_OK)
@router.post(
f"/{get_settings().bot_webhook_url}",
name="system:process_bot_updates",
status_code=status.HTTP_202_ACCEPTED,
summary="process bot updates",
)
async def process_bot_updates(request: Request) -> ORJSONResponse:
await request.app.state.queue.put_updates_on_queue(request)
data = await request.app.state.queue.get_updates_from_queue()
print(data)
return ORJSONResponse(content=None, status_code=status.HTTP_202_ACCEPTED)

1
app/constants.py Normal file
View File

@@ -0,0 +1 @@
API_PREFIX = '/api'

0
app/core/__init__.py Normal file
View File

79
app/core/bot.py Normal file
View File

@@ -0,0 +1,79 @@
import asyncio
import os
from asyncio import Queue, sleep
from dataclasses import dataclass
from functools import cached_property
from http import HTTPStatus
from fastapi import Request, Response
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
from app.constants import API_PREFIX
from settings.config import Settings
class Bot:
def __init__(self, settings: Settings) -> None:
self.application: Application = ( # type: ignore
Application.builder().token(token=settings.TELEGRAM_API_TOKEN).build()
)
self.add_handlers()
self.settings = settings
async def set_webhook(self) -> None:
await self.application.initialize()
# await self.application.start()
await self.application.bot.set_webhook(url=self.webhook_url)
async def delete_webhook(self) -> None:
await self.application.bot.delete_webhook()
@staticmethod
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
if update.message:
await update.message.reply_text('Help!')
return None
def add_handlers(self) -> None:
self.application.add_handler(CommandHandler('help', self.help_command))
async def polling(self) -> None:
await self.application.initialize()
await self.application.start()
await self.application.updater.start_polling() # type: ignore
async def shutdown(self) -> None:
await self.application.updater.shutdown() # type: ignore
@cached_property
def webhook_url(self) -> str:
return os.path.join(
self.settings.WEBHOOK_HOST.strip('/'),
API_PREFIX.strip('/'),
self.settings.WEBHOOK_PATH.strip('/'),
self.settings.TELEGRAM_API_TOKEN.strip('/'),
)
@dataclass
class BotQueue:
bot: Bot
queue: Queue = asyncio.Queue() # type: ignore
async def put_updates_on_queue(self, request: Request) -> Response:
"""
Listen {WEBHOOK_PATH}/{TELEGRAM_WEB_TOKEN} path and proxy post request to bot
"""
data = await request.json()
tg_update = Update.de_json(data=data, bot=self.bot.application.bot)
self.queue.put_nowait(tg_update)
return Response(status_code=HTTPStatus.ACCEPTED)
async def get_updates_from_queue(self) -> None:
while True:
update = await self.queue.get()
await self.bot.application.process_update(update)
await sleep(0.1)

View File

@@ -1,9 +1,22 @@
import sys
from functools import cached_property
from fastapi import FastAPI
from fastapi.responses import UJSONResponse
from loguru import logger
from app.core.bot import Bot, BotQueue
from app.routers import api_router
from settings.config import Settings, get_settings
logger.remove()
logger.add(
sink=sys.stdout,
colorize=True,
level='DEBUG',
format="<cyan>{time:DD.MM.YYYY HH:mm:ss}</cyan> | <level>{level}</level> | <magenta>{message}</magenta>",
)
class Application:
def __init__(self, settings: Settings) -> None:
@@ -19,17 +32,26 @@ class Application:
self.app.state.settings = settings
self.settings = settings
self.app.include_router(api_router)
self.configure_hooks()
@property
def fastapi_app(self) -> FastAPI:
return self.app
def configure_hooks(self) -> None:
self.app.add_event_handler("startup", self.connect_databases) # type: ignore
self.app.add_event_handler("startup", self.create_redis_cluster) # type: ignore
@cached_property
def bot(self) -> Bot:
return Bot(self.settings)
self.app.add_event_handler("shutdown", self.disconnect_databases) # type: ignore
self.app.add_event_handler("shutdown", self.close_redis_cluster) # type: ignore
def set_bot_queue(self) -> None:
self.app.state.queue = BotQueue(bot=self.bot)
def configure_hooks(self) -> None:
self.app.add_event_handler("startup", self.bot.set_webhook)
# self.app.add_event_handler("startup", self.bot.polling) # noqa: E800
self.app.add_event_handler("startup", self.set_bot_queue)
self.app.add_event_handler("shutdown", self.bot.delete_webhook)
self.app.add_event_handler("shutdown", self.bot.shutdown)
def create_app(settings: Settings | None = None) -> FastAPI:
@@ -49,7 +71,7 @@ def main() -> None:
workers=app.state.settings.WORKERS_COUNT,
host=app.state.settings.APP_HOST,
port=app.state.settings.APP_PORT,
reload=app.state.settings.RELOAD,
# reload=app.state.settings.RELOAD, # noqa: E800 remove reload for debug
factory=True,
)

View File

@@ -2,8 +2,9 @@ from fastapi import APIRouter
from fastapi.responses import ORJSONResponse
from app.api.system.controllers import router as system_router
from app.constants import API_PREFIX
api_router = APIRouter(prefix='/api', default_response_class=ORJSONResponse)
api_router = APIRouter(prefix=API_PREFIX, default_response_class=ORJSONResponse)
api_router.include_router(system_router, tags=['system'])