From 9aca4c2aeb58c90389bb86c47d792bfef0a7cdb6 Mon Sep 17 00:00:00 2001 From: Dmitry Afanasyev <71835315+Balshgit@users.noreply.github.com> Date: Tue, 26 Sep 2023 05:30:27 +0300 Subject: [PATCH] add speech to text command (#6) --- README.md | 9 ++- bot_microservice/api/bot/controllers.py | 6 +- bot_microservice/constants.py | 2 + bot_microservice/core/bot.py | 14 ++-- bot_microservice/core/commands.py | 67 +++++++++------ bot_microservice/core/logging.py | 9 ++- bot_microservice/core/utils.py | 81 +++++++++++++++++-- bot_microservice/main.py | 3 + deploy/Caddyfile | 2 +- deploy/Dockerfile | 12 ++- ...check_bot.service => chat_gpt_bot.service} | 0 scripts/start-bot.sh | 2 +- scripts/start-chat.sh | 11 --- 13 files changed, 150 insertions(+), 68 deletions(-) rename scripts/{healthcheck_bot.service => chat_gpt_bot.service} (100%) delete mode 100644 scripts/start-chat.sh diff --git a/README.md b/README.md index a18c616..7f9a944 100644 --- a/README.md +++ b/README.md @@ -46,16 +46,21 @@ methods: ## Chat: ```shell -cd chat_gpt_microservice +cd bot_microservice python3 run.py ``` ```bash -cd chat_gpt_microservice +cd bot_microservice poetry run uvicorn --host 0.0.0.0 --factory run:create_app --port 1338 --reload ``` +```bash +cd bot_microservice +gunicorn main:create_app --workers 10 --bind 0.0.0.0:8083 --worker-class uvicorn.workers.UvicornWorker --timeout 150 --max-requests 2000 --max-requests-jitter 400 +``` + ## Tests diff --git a/bot_microservice/api/bot/controllers.py b/bot_microservice/api/bot/controllers.py index 8c9e0d3..920771d 100644 --- a/bot_microservice/api/bot/controllers.py +++ b/bot_microservice/api/bot/controllers.py @@ -1,7 +1,7 @@ from fastapi import APIRouter, Request -from fastapi.responses import ORJSONResponse from settings.config import get_settings from starlette import status +from starlette.responses import Response router = APIRouter() settings = get_settings() @@ -10,10 +10,10 @@ settings = get_settings() @router.post( f"/{settings.TELEGRAM_API_TOKEN}", name="system:process_bot_updates", + response_class=Response, status_code=status.HTTP_202_ACCEPTED, summary="process bot updates", include_in_schema=False, ) -async def process_bot_updates(request: Request) -> ORJSONResponse: +async def process_bot_updates(request: Request) -> None: await request.app.state.queue.put_updates_on_queue(request) - return ORJSONResponse(content=None, status_code=status.HTTP_202_ACCEPTED) diff --git a/bot_microservice/constants.py b/bot_microservice/constants.py index 425fcb6..0b45aff 100644 --- a/bot_microservice/constants.py +++ b/bot_microservice/constants.py @@ -1,5 +1,7 @@ from enum import StrEnum +AUDIO_SEGMENT_DURATION = 120 * 1000 + API_PREFIX = "/api" CHAT_GPT_BASE_URL = "http://chat_service:1338/backend-api/v2/conversation" diff --git a/bot_microservice/core/bot.py b/bot_microservice/core/bot.py index 8544387..d54e60f 100644 --- a/bot_microservice/core/bot.py +++ b/bot_microservice/core/bot.py @@ -29,13 +29,15 @@ class BotApplication: self._add_handlers() async def set_webhook(self) -> None: - await self.application.initialize() - await self.application.bot.set_webhook(url=self.webhook_url) - logger.info('webhook is set') + _, webhook_info = await asyncio.gather(self.application.initialize(), self.application.bot.get_webhook_info()) + if not webhook_info.url: + await self.application.bot.set_webhook(url=self.webhook_url) + webhook_info = await self.application.bot.get_webhook_info() + logger.info("webhook is set", ip_address=webhook_info.ip_address) async def delete_webhook(self) -> None: - await self.application.bot.delete_webhook() - logger.info('webhook has been deleted') + if await self.application.bot.delete_webhook(): + logger.info("webhook has been deleted") async def polling(self) -> None: await self.application.initialize() @@ -73,5 +75,5 @@ class BotQueue: async def get_updates_from_queue(self) -> None: while True: update = await self.queue.get() - await self.bot_app.application.process_update(update) + asyncio.create_task(self.bot_app.application.process_update(update)) await sleep(0) diff --git a/bot_microservice/core/commands.py b/bot_microservice/core/commands.py index 4a9ccc4..7aa4e6b 100644 --- a/bot_microservice/core/commands.py +++ b/bot_microservice/core/commands.py @@ -1,10 +1,11 @@ +import asyncio import random import tempfile from uuid import uuid4 import httpx from constants import CHAT_GPT_BASE_URL -from core.utils import convert_file_to_wav +from core.utils import SpeechToTextService from httpx import AsyncClient, AsyncHTTPTransport from loguru import logger from telegram import Update @@ -14,19 +15,20 @@ from telegram.ext import ContextTypes async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Send a message when the command /help is issued.""" - if update.message: - await update.message.reply_text( - "Help!", - disable_notification=True, - api_kwargs={"text": "Hello World"}, - ) - return None + if not update.message: + return None + await update.message.reply_text( + "Help!", + disable_notification=True, + api_kwargs={"text": "Hello World"}, + ) async def ask_question(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await update.message.reply_text( # type: ignore[union-attr] - "Пожалуйста подождите, ответ в среднем занимает 10-15 секунд" - ) + if not update.message: + return None + + await update.message.reply_text("Пожалуйста подождите, ответ в среднем занимает 10-15 секунд") chat_gpt_request = { "conversation_id": str(uuid4()), @@ -39,36 +41,51 @@ async def ask_question(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No "conversation": [], "internet_access": False, "content_type": "text", - "parts": [{"content": update.message.text, "role": "user"}], # type: ignore[union-attr] + "parts": [{"content": update.message.text, "role": "user"}], }, }, } - transport = AsyncHTTPTransport(retries=1) - async with AsyncClient(transport=transport) as client: + transport = AsyncHTTPTransport(retries=3) + async with AsyncClient(transport=transport, timeout=50) as client: try: - response = await client.post(CHAT_GPT_BASE_URL, json=chat_gpt_request) + response = await client.post(CHAT_GPT_BASE_URL, json=chat_gpt_request, timeout=50) status = response.status_code if status != httpx.codes.OK: logger.info(f'got response status: {status} from chat api', data=chat_gpt_request) - await update.message.reply_text( # type: ignore[union-attr] + await update.message.reply_text( "Что-то пошло не так, попробуйте еще раз или обратитесь к администратору" ) return - data = response.json() - await update.message.reply_text(data) # type: ignore[union-attr] + await update.message.reply_text(response.text) except Exception as error: logger.error("error get data from chat api", error=error) - await update.message.reply_text("Вообще всё сломалось :(") # type: ignore[union-attr] + await update.message.reply_text("Вообще всё сломалось :(") async def voice_recognize(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await update.message.reply_text( # type: ignore[union-attr] - "Пожалуйста, ожидайте :)\nТрехминутная запись обрабатывается примерно 30 секунд" - ) - sound_bytes = await update.message.voice.get_file() # type: ignore[union-attr] - sound_bytes = await sound_bytes.download_as_bytearray() + if not update.message: + return None + await update.message.reply_text("Пожалуйста, ожидайте :)\nТрехминутная запись обрабатывается примерно 30 секунд") + if not update.message.voice: + return None + + sound_file = await update.message.voice.get_file() + sound_bytes = await sound_file.download_as_bytearray() with tempfile.NamedTemporaryFile(delete=False) as tmpfile: tmpfile.write(sound_bytes) - convert_file_to_wav(tmpfile.name) + + logger.info('file has been saved', filename=tmpfile.name) + + speech_to_text_service = SpeechToTextService(filename=tmpfile.name) + + speech_to_text_service.get_text_from_audio() + + part = 0 + while speech_to_text_service.text_parts or not speech_to_text_service.text_recognised: + if text := speech_to_text_service.text_parts.get(part): + speech_to_text_service.text_parts.pop(part) + await update.message.reply_text(text) + part += 1 + await asyncio.sleep(5) diff --git a/bot_microservice/core/logging.py b/bot_microservice/core/logging.py index a18d174..df8f4f5 100644 --- a/bot_microservice/core/logging.py +++ b/bot_microservice/core/logging.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any, cast from constants import LogLevelEnum from loguru import logger from sentry_sdk.integrations.logging import EventHandler +from settings.config import get_settings if TYPE_CHECKING: from loguru import Record @@ -13,6 +14,9 @@ else: Record = dict[str, Any] +settings = get_settings() + + class InterceptHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: # Get corresponding Loguru level if it exists @@ -29,7 +33,7 @@ class InterceptHandler(logging.Handler): logger.opt(depth=depth, exception=record.exc_info).log( level, - record.getMessage(), + record.getMessage().replace(settings.TELEGRAM_API_TOKEN, "TELEGRAM_API_TOKEN".center(24, '*')), ) @@ -97,6 +101,3 @@ def _text_formatter(record: Record) -> str: format_ += "{exception}\n" return format_ - - -configure_logging(level=LogLevelEnum.DEBUG, enable_json_logs=True, enable_sentry_logs=True) diff --git a/bot_microservice/core/utils.py b/bot_microservice/core/utils.py index d019d57..63090d8 100644 --- a/bot_microservice/core/utils.py +++ b/bot_microservice/core/utils.py @@ -1,9 +1,18 @@ +import os import subprocess # noqa +from concurrent.futures.thread import ThreadPoolExecutor from datetime import datetime, timedelta from functools import lru_cache, wraps from typing import Any +from constants import AUDIO_SEGMENT_DURATION from loguru import logger +from pydub import AudioSegment +from speech_recognition import ( + AudioFile, + Recognizer, + UnknownValueError as SpeechRecognizerError, +) def timed_cache(**timedelta_kwargs: Any) -> Any: @@ -27,13 +36,69 @@ def timed_cache(**timedelta_kwargs: Any) -> Any: return _wrapper -def convert_file_to_wav(filename: str) -> str: - new_filename = filename + '.wav' +class SpeechToTextService: + def __init__(self, filename: str) -> None: + self.executor = ThreadPoolExecutor() - cmd = ['ffmpeg', '-loglevel', 'quiet', '-i', filename, '-vn', new_filename] + self.filename = filename + self.recognizer = Recognizer() + self.recognizer.energy_threshold = 50 + self.text_parts: dict[int, str] = {} + self.text_recognised = False - try: - subprocess.run(args=cmd) # noqa: S603 - except Exception as error: - logger.error("cant convert voice: reason", error=error) - return new_filename + def get_text_from_audio(self) -> None: + self.executor.submit(self.worker) + + def worker(self) -> Any: + self._convert_file_to_wav() + self._convert_audio_to_text() + + def _convert_audio_to_text(self) -> None: + wav_filename = f'{self.filename}.wav' + + speech = AudioSegment.from_wav(wav_filename) + speech_duration = len(speech) + pieces = speech_duration // AUDIO_SEGMENT_DURATION + 1 + ending = speech_duration % AUDIO_SEGMENT_DURATION + for i in range(pieces): + if i == 0 and pieces == 1: + sound_segment = speech[0:ending] + elif i == 0: + sound_segment = speech[0 : (i + 1) * AUDIO_SEGMENT_DURATION] + elif i == (pieces - 1): + sound_segment = speech[i * AUDIO_SEGMENT_DURATION - 250 : i * AUDIO_SEGMENT_DURATION + ending] + else: + sound_segment = speech[i * AUDIO_SEGMENT_DURATION - 250 : (i + 1) * AUDIO_SEGMENT_DURATION] + self.text_parts[i] = self._recognize_by_google(wav_filename, sound_segment) + + self.text_recognised = True + + # clean temp voice message main files + try: + os.remove(wav_filename) + os.remove(self.filename) + except FileNotFoundError as error: + logger.error("error temps files not deleted", error=error, filenames=[self.filename, self.filename]) + + def _convert_file_to_wav(self) -> None: + new_filename = self.filename + '.wav' + cmd = ['ffmpeg', '-loglevel', 'quiet', '-i', self.filename, '-vn', new_filename] + try: + subprocess.run(args=cmd) # noqa: S603 + logger.info("file has been converted to wav", filename=new_filename) + except Exception as error: + logger.error("cant convert voice", error=error, filename=self.filename) + + def _recognize_by_google(self, filename: str, sound_segment: AudioSegment) -> str: + tmp_filename = f"{filename}_tmp_part" + sound_segment.export(tmp_filename, format="wav") + with AudioFile(tmp_filename) as source: + audio_text = self.recognizer.listen(source) + try: + text = self.recognizer.recognize_google(audio_text, language='ru-RU') + os.remove(tmp_filename) + return text + except SpeechRecognizerError as error: + os.remove(tmp_filename) + logger.error("error recognizing text with google", error=error) + raise error diff --git a/bot_microservice/main.py b/bot_microservice/main.py index 25fcc37..ee45b1f 100644 --- a/bot_microservice/main.py +++ b/bot_microservice/main.py @@ -2,8 +2,10 @@ import asyncio from functools import cached_property import sentry_sdk +from constants import LogLevelEnum from core.bot import BotApplication, BotQueue from core.handlers import bot_event_handlers +from core.logging import configure_logging from fastapi import FastAPI from fastapi.responses import UJSONResponse from routers import api_router @@ -27,6 +29,7 @@ class Application: self.app.include_router(api_router) self.configure_hooks() + configure_logging(level=LogLevelEnum.INFO, enable_json_logs=True, enable_sentry_logs=True) if settings.SENTRY_DSN is not None: sentry_sdk.init( diff --git a/deploy/Caddyfile b/deploy/Caddyfile index 70911d9..0db5c49 100644 --- a/deploy/Caddyfile +++ b/deploy/Caddyfile @@ -1,6 +1,6 @@ # Telegram bot. Redirects to bot-service :8083 { - reverse_proxy bot_service:8080 + reverse_proxy bot_service:8000 header Strict-Transport-Security max-age=31536000; # Removing some headers for improved security: diff --git a/deploy/Dockerfile b/deploy/Dockerfile index 1f40018..8aacc24 100644 --- a/deploy/Dockerfile +++ b/deploy/Dockerfile @@ -28,9 +28,9 @@ RUN printf "================ Start build base service. with USER: ${USER} ====== WORKDIR /app/ RUN if [ "$USER" != "root" ]; then \ - mkdir /home/"$USER" \ - && groupadd -r "$USER" && useradd -d /home/"$USER" -r -g "$USER" "$USER" \ - && chown "$USER":"$USER" -R /home/"$USER"; \ + mkdir /home/${USER} \ + && groupadd -r ${USER} && useradd -d /home/${USER} -r -g ${USER} ${USER} \ + && chown ${USER}:${USER} -R /home/${USER}; \ fi COPY --chown=${USER}:${USER} ../poetry.lock ../pyproject.toml /app/ @@ -51,6 +51,7 @@ WORKDIR /app/ # Copying bot service COPY --chown=${USER}:${USER} ../bot_microservice /app/ +RUN mkdir "/app/shared" -p && chown ${USER}:${USER} -R /app/shared COPY ./scripts/start-bot.sh /app/ RUN chmod +x ./start-bot.sh @@ -72,9 +73,6 @@ WORKDIR /app/ # Copying bot service COPY --chown=${USER}:${USER} ../chat_gpt_microservice /app/ -COPY ./scripts/start-chat.sh /app/ -RUN chmod +x ./start-chat.sh - COPY --from=compile-image /app/.venv /app/.venv ENV PATH="/app/.venv/bin:$PATH" @@ -82,4 +80,4 @@ USER ${USER} RUN chmod -R 777 translations -CMD ["bash", "start-chat.sh"] \ No newline at end of file +CMD ["python3", "./run.py"] \ No newline at end of file diff --git a/scripts/healthcheck_bot.service b/scripts/chat_gpt_bot.service similarity index 100% rename from scripts/healthcheck_bot.service rename to scripts/chat_gpt_bot.service diff --git a/scripts/start-bot.sh b/scripts/start-bot.sh index a0b3100..90e5cb3 100644 --- a/scripts/start-bot.sh +++ b/scripts/start-bot.sh @@ -6,7 +6,7 @@ if [[ "${START_WITH_WEBHOOK}" == "true" ]] then echo "Starting bot in webhook mode..." gunicorn main:create_app \ - -- workers ${WORKERS_COUNT} \ + --workers ${WORKERS_COUNT} \ --bind ${APP_HOST}:${APP_PORT} \ --worker-class uvicorn.workers.UvicornWorker \ --timeout 150 \ diff --git a/scripts/start-chat.sh b/scripts/start-chat.sh deleted file mode 100644 index aac314d..0000000 --- a/scripts/start-chat.sh +++ /dev/null @@ -1,11 +0,0 @@ -#! /bin/bash - -echo "starting chat" - -gunicorn run:create_app \ - -- workers ${WORKERS_COUNT} \ - --bind ${APP_HOST}:${APP_PORT} \ - --worker-class uvicorn.workers.UvicornWorker \ - --timeout 150 \ - --max-requests 2000 \ - --max-requests-jitter 400 \ No newline at end of file