From cdf2720e57e020573f0843215af5bac63c7ee3af Mon Sep 17 00:00:00 2001 From: Dmitry Afanasyev Date: Sat, 20 Aug 2022 03:07:50 +0300 Subject: [PATCH] add async queue --- app/main.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/app/main.py b/app/main.py index 4a2437b..7f601a2 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,6 @@ import asyncio import sys +from concurrent.futures import ThreadPoolExecutor from http import HTTPStatus from pathlib import Path @@ -22,6 +23,8 @@ from app.settings import ( WEBHOOK_URL, ) +executor = ThreadPoolExecutor(max_workers=1) + queue: asyncio.Queue = asyncio.Queue() # type: ignore @@ -30,7 +33,8 @@ async def bot_startup() -> None: logger.info(f'Webhook set to {WEBHOOK_URL}'.replace(API_TOKEN, '{BOT_API_TOKEN}')) asyncio_schedule() logger.info('Scheduler started') - await worker() + loop = asyncio.get_running_loop() + await loop.run_in_executor(executor, worker, queue) # type: ignore logger.info('Worker started') @@ -94,14 +98,14 @@ async def webhook(request: web.Request) -> web.Response: return web.Response(status=HTTPStatus.ACCEPTED) -async def worker() -> None: +async def worker(shared_queue: asyncio.Queue) -> None: # type: ignore Dispatcher.set_current(dispatcher) Bot.set_current(dispatcher.bot) await asyncio.sleep(3) logger.info('Worker is working') while True: await asyncio.sleep(4) - update = await queue.get() + update = await shared_queue.get() logger.warning(f"Get update {update}") await dispatcher.process_update(update)