mirror of
https://github.com/Balshgit/gpt_chat_bot.git
synced 2026-02-03 11:40:39 +03:00
microservices are able to run (#5)
This commit is contained in:
0
bot_microservice/api/__init__.py
Normal file
0
bot_microservice/api/__init__.py
Normal file
0
bot_microservice/api/bot/__init__.py
Normal file
0
bot_microservice/api/bot/__init__.py
Normal file
19
bot_microservice/api/bot/controllers.py
Normal file
19
bot_microservice/api/bot/controllers.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import ORJSONResponse
|
||||
from settings.config import get_settings
|
||||
from starlette import status
|
||||
|
||||
router = APIRouter()
|
||||
settings = get_settings()
|
||||
|
||||
|
||||
@router.post(
|
||||
f"/{settings.TELEGRAM_API_TOKEN}",
|
||||
name="system:process_bot_updates",
|
||||
status_code=status.HTTP_202_ACCEPTED,
|
||||
summary="process bot updates",
|
||||
include_in_schema=False,
|
||||
)
|
||||
async def process_bot_updates(request: Request) -> ORJSONResponse:
|
||||
await request.app.state.queue.put_updates_on_queue(request)
|
||||
return ORJSONResponse(content=None, status_code=status.HTTP_202_ACCEPTED)
|
||||
0
bot_microservice/api/system/__init__.py
Normal file
0
bot_microservice/api/system/__init__.py
Normal file
15
bot_microservice/api/system/controllers.py
Normal file
15
bot_microservice/api/system/controllers.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import ORJSONResponse
|
||||
from starlette import status
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get(
|
||||
"/healthcheck",
|
||||
name="system:healthcheck",
|
||||
status_code=status.HTTP_200_OK,
|
||||
summary="Healthcheck service",
|
||||
)
|
||||
async def healthcheck() -> ORJSONResponse:
|
||||
return ORJSONResponse(content=None, status_code=status.HTTP_200_OK)
|
||||
13
bot_microservice/constants.py
Normal file
13
bot_microservice/constants.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from enum import StrEnum
|
||||
|
||||
API_PREFIX = "/api"
|
||||
CHAT_GPT_BASE_URL = "http://chat_service:1338/backend-api/v2/conversation"
|
||||
|
||||
|
||||
class LogLevelEnum(StrEnum):
|
||||
CRITICAL = "critical"
|
||||
ERROR = "error"
|
||||
WARNING = "warning"
|
||||
INFO = "info"
|
||||
DEBUG = "debug"
|
||||
NOTSET = ""
|
||||
0
bot_microservice/core/__init__.py
Normal file
0
bot_microservice/core/__init__.py
Normal file
77
bot_microservice/core/bot.py
Normal file
77
bot_microservice/core/bot.py
Normal file
@@ -0,0 +1,77 @@
|
||||
import asyncio
|
||||
import os
|
||||
from asyncio import Queue, sleep
|
||||
from dataclasses import dataclass
|
||||
from functools import cached_property
|
||||
from http import HTTPStatus
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Request, Response
|
||||
from loguru import logger
|
||||
from settings.config import AppSettings
|
||||
from telegram import Update
|
||||
from telegram.ext import Application
|
||||
|
||||
|
||||
class BotApplication:
|
||||
def __init__(
|
||||
self,
|
||||
settings: AppSettings,
|
||||
handlers: list[Any],
|
||||
application: Application | None = None, # type: ignore[type-arg]
|
||||
) -> None:
|
||||
self.application: Application = application or ( # type: ignore
|
||||
Application.builder().token(token=settings.TELEGRAM_API_TOKEN).build()
|
||||
)
|
||||
self.handlers = handlers
|
||||
self.settings = settings
|
||||
self.start_with_webhook = settings.START_WITH_WEBHOOK
|
||||
self._add_handlers()
|
||||
|
||||
async def set_webhook(self) -> None:
|
||||
await self.application.initialize()
|
||||
await self.application.bot.set_webhook(url=self.webhook_url)
|
||||
logger.info('webhook is set')
|
||||
|
||||
async def delete_webhook(self) -> None:
|
||||
await self.application.bot.delete_webhook()
|
||||
logger.info('webhook has been deleted')
|
||||
|
||||
async def polling(self) -> None:
|
||||
await self.application.initialize()
|
||||
await self.application.start()
|
||||
await self.application.updater.start_polling() # type: ignore
|
||||
logger.info("bot started in polling mode")
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
await self.application.updater.shutdown() # type: ignore
|
||||
|
||||
@cached_property
|
||||
def webhook_url(self) -> str:
|
||||
return os.path.join(self.settings.DOMAIN.strip("/"), self.settings.bot_webhook_url.strip("/"))
|
||||
|
||||
def _add_handlers(self) -> None:
|
||||
for handler in self.handlers:
|
||||
self.application.add_handler(handler)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BotQueue:
|
||||
bot_app: BotApplication
|
||||
queue: Queue = asyncio.Queue() # type: ignore[type-arg]
|
||||
|
||||
async def put_updates_on_queue(self, request: Request) -> Response:
|
||||
"""
|
||||
Listen /{URL_PREFIX}/{API_PREFIX}/{TELEGRAM_WEB_TOKEN} path and proxy post request to bot
|
||||
"""
|
||||
data = await request.json()
|
||||
tg_update = Update.de_json(data=data, bot=self.bot_app.application.bot)
|
||||
self.queue.put_nowait(tg_update)
|
||||
|
||||
return Response(status_code=HTTPStatus.ACCEPTED)
|
||||
|
||||
async def get_updates_from_queue(self) -> None:
|
||||
while True:
|
||||
update = await self.queue.get()
|
||||
await self.bot_app.application.process_update(update)
|
||||
await sleep(0)
|
||||
74
bot_microservice/core/commands.py
Normal file
74
bot_microservice/core/commands.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import random
|
||||
import tempfile
|
||||
from uuid import uuid4
|
||||
|
||||
import httpx
|
||||
from constants import CHAT_GPT_BASE_URL
|
||||
from core.utils import convert_file_to_wav
|
||||
from httpx import AsyncClient, AsyncHTTPTransport
|
||||
from loguru import logger
|
||||
from telegram import Update
|
||||
from telegram.ext import ContextTypes
|
||||
|
||||
|
||||
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Send a message when the command /help is issued."""
|
||||
|
||||
if update.message:
|
||||
await update.message.reply_text(
|
||||
"Help!",
|
||||
disable_notification=True,
|
||||
api_kwargs={"text": "Hello World"},
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
async def ask_question(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
await update.message.reply_text( # type: ignore[union-attr]
|
||||
"Пожалуйста подождите, ответ в среднем занимает 10-15 секунд"
|
||||
)
|
||||
|
||||
chat_gpt_request = {
|
||||
"conversation_id": str(uuid4()),
|
||||
"action": "_ask",
|
||||
"model": "gpt-3.5-turbo",
|
||||
"jailbreak": "default",
|
||||
"meta": {
|
||||
"id": random.randint(10**18, 10**19 - 1), # noqa: S311
|
||||
"content": {
|
||||
"conversation": [],
|
||||
"internet_access": False,
|
||||
"content_type": "text",
|
||||
"parts": [{"content": update.message.text, "role": "user"}], # type: ignore[union-attr]
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
transport = AsyncHTTPTransport(retries=1)
|
||||
async with AsyncClient(transport=transport) as client:
|
||||
try:
|
||||
response = await client.post(CHAT_GPT_BASE_URL, json=chat_gpt_request)
|
||||
status = response.status_code
|
||||
if status != httpx.codes.OK:
|
||||
logger.info(f'got response status: {status} from chat api', data=chat_gpt_request)
|
||||
await update.message.reply_text( # type: ignore[union-attr]
|
||||
"Что-то пошло не так, попробуйте еще раз или обратитесь к администратору"
|
||||
)
|
||||
return
|
||||
|
||||
data = response.json()
|
||||
await update.message.reply_text(data) # type: ignore[union-attr]
|
||||
except Exception as error:
|
||||
logger.error("error get data from chat api", error=error)
|
||||
await update.message.reply_text("Вообще всё сломалось :(") # type: ignore[union-attr]
|
||||
|
||||
|
||||
async def voice_recognize(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
await update.message.reply_text( # type: ignore[union-attr]
|
||||
"Пожалуйста, ожидайте :)\nТрехминутная запись обрабатывается примерно 30 секунд"
|
||||
)
|
||||
sound_bytes = await update.message.voice.get_file() # type: ignore[union-attr]
|
||||
sound_bytes = await sound_bytes.download_as_bytearray()
|
||||
with tempfile.NamedTemporaryFile(delete=False) as tmpfile:
|
||||
tmpfile.write(sound_bytes)
|
||||
convert_file_to_wav(tmpfile.name)
|
||||
20
bot_microservice/core/handlers.py
Normal file
20
bot_microservice/core/handlers.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from core.commands import ask_question, help_command, voice_recognize
|
||||
from telegram.ext import CommandHandler, MessageHandler, filters
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandHandlers:
|
||||
handlers: list[Any] = field(default_factory=list[Any])
|
||||
|
||||
def add_handler(self, handler: Any) -> None:
|
||||
self.handlers.append(handler)
|
||||
|
||||
|
||||
command_handlers = CommandHandlers()
|
||||
|
||||
command_handlers.add_handler(CommandHandler("help", help_command))
|
||||
command_handlers.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, ask_question))
|
||||
command_handlers.add_handler(MessageHandler(filters.VOICE | filters.AUDIO, voice_recognize))
|
||||
102
bot_microservice/core/logging.py
Normal file
102
bot_microservice/core/logging.py
Normal file
@@ -0,0 +1,102 @@
|
||||
import logging
|
||||
import sys
|
||||
from types import FrameType
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from constants import LogLevelEnum
|
||||
from loguru import logger
|
||||
from sentry_sdk.integrations.logging import EventHandler
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from loguru import Record
|
||||
else:
|
||||
Record = dict[str, Any]
|
||||
|
||||
|
||||
class InterceptHandler(logging.Handler):
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
# Get corresponding Loguru level if it exists
|
||||
try:
|
||||
level = logger.level(record.levelname).name
|
||||
except ValueError:
|
||||
level = str(record.levelno)
|
||||
|
||||
# Find caller from where originated the logged message
|
||||
frame, depth = logging.currentframe(), 2
|
||||
while frame.f_code.co_filename == logging.__file__:
|
||||
frame = cast(FrameType, frame.f_back)
|
||||
depth += 1
|
||||
|
||||
logger.opt(depth=depth, exception=record.exc_info).log(
|
||||
level,
|
||||
record.getMessage(),
|
||||
)
|
||||
|
||||
|
||||
def configure_logging(*, level: LogLevelEnum, enable_json_logs: bool, enable_sentry_logs: bool) -> None:
|
||||
logging_level = level.name
|
||||
|
||||
intercept_handler = InterceptHandler()
|
||||
|
||||
logging.basicConfig(handlers=[intercept_handler], level=logging_level)
|
||||
|
||||
formatter = _json_formatter if enable_json_logs else _text_formatter
|
||||
logger.configure(
|
||||
handlers=[
|
||||
{
|
||||
"sink": sys.stdout,
|
||||
"level": logging_level,
|
||||
"serialize": enable_json_logs,
|
||||
"format": formatter,
|
||||
"colorize": True,
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
# sentry sdk не умеет из коробки работать с loguru, нужно добавлять хандлер
|
||||
# https://github.com/getsentry/sentry-python/issues/653#issuecomment-788854865
|
||||
# https://forum.sentry.io/t/changing-issue-title-when-logging-with-traceback/446
|
||||
if enable_sentry_logs:
|
||||
handler = EventHandler(level=logging.WARNING)
|
||||
logger.add(handler, diagnose=True, level=logging.WARNING, format=_sentry_formatter)
|
||||
|
||||
|
||||
def _json_formatter(record: Record) -> str:
|
||||
# Обрезаем `\n` в конце логов, т.к. в json формате переносы не нужны
|
||||
return record.get("message", "").strip()
|
||||
|
||||
|
||||
def _sentry_formatter(record: Record) -> str:
|
||||
return "{name}:{function} {message}"
|
||||
|
||||
|
||||
def _text_formatter(record: Record) -> str:
|
||||
# WARNING !!!
|
||||
# Функция должна возвращать строку, которая содержит только шаблоны для форматирования.
|
||||
# Если в строку прокидывать значения из record (или еще откуда-либо),
|
||||
# то loguru может принять их за f-строки и попытается обработать, что приведет к ошибке.
|
||||
# Например, если нужно достать какое-то значение из поля extra, вместо того чтобы прокидывать его в строку формата,
|
||||
# нужно прокидывать подстроку вида {extra[тут_ключ]}
|
||||
|
||||
# Стандартный формат loguru. Задается через env LOGURU_FORMAT
|
||||
format_ = (
|
||||
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
|
||||
"<level>{level: <8}</level> | "
|
||||
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
|
||||
)
|
||||
|
||||
# Добавляем мета параметры по типу user_id, art_id, которые передаются через logger.bind(...)
|
||||
extra = record["extra"]
|
||||
if extra:
|
||||
formatted = ", ".join(f"{key}" + "={extra[" + str(key) + "]}" for key, value in extra.items())
|
||||
format_ += f" - <cyan>{formatted}</cyan>"
|
||||
|
||||
format_ += "\n"
|
||||
|
||||
if record["exception"] is not None:
|
||||
format_ += "{exception}\n"
|
||||
|
||||
return format_
|
||||
|
||||
|
||||
configure_logging(level=LogLevelEnum.DEBUG, enable_json_logs=True, enable_sentry_logs=True)
|
||||
39
bot_microservice/core/utils.py
Normal file
39
bot_microservice/core/utils.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import subprocess # noqa
|
||||
from datetime import datetime, timedelta
|
||||
from functools import lru_cache, wraps
|
||||
from typing import Any
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def timed_cache(**timedelta_kwargs: Any) -> Any:
|
||||
def _wrapper(func: Any) -> Any:
|
||||
update_delta = timedelta(**timedelta_kwargs)
|
||||
next_update = datetime.utcnow() + update_delta
|
||||
# Apply @lru_cache to f with no cache size limit
|
||||
cached_func = lru_cache(None)(func)
|
||||
|
||||
@wraps(func)
|
||||
def _wrapped(*args: Any, **kwargs: Any) -> Any:
|
||||
nonlocal next_update
|
||||
now = datetime.utcnow()
|
||||
if now >= next_update:
|
||||
cached_func.cache_clear()
|
||||
next_update = now + update_delta
|
||||
return cached_func(*args, **kwargs)
|
||||
|
||||
return _wrapped
|
||||
|
||||
return _wrapper
|
||||
|
||||
|
||||
def convert_file_to_wav(filename: str) -> str:
|
||||
new_filename = filename + '.wav'
|
||||
|
||||
cmd = ['ffmpeg', '-loglevel', 'quiet', '-i', filename, '-vn', new_filename]
|
||||
|
||||
try:
|
||||
subprocess.run(args=cmd) # noqa: S603
|
||||
except Exception as error:
|
||||
logger.error("cant convert voice: reason", error=error)
|
||||
return new_filename
|
||||
84
bot_microservice/main.py
Normal file
84
bot_microservice/main.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import asyncio
|
||||
from functools import cached_property
|
||||
|
||||
import sentry_sdk
|
||||
from core.bot import BotApplication, BotQueue
|
||||
from core.handlers import command_handlers
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import UJSONResponse
|
||||
from routers import api_router
|
||||
from settings.config import AppSettings, get_settings
|
||||
|
||||
|
||||
class Application:
|
||||
def __init__(self, settings: AppSettings, bot_app: BotApplication) -> None:
|
||||
self.app = FastAPI(
|
||||
title="Chat gpt bot",
|
||||
description="Bot for proxy to chat gpt in telegram",
|
||||
version="0.0.3",
|
||||
docs_url="/" + "/".join([settings.api_prefix.strip("/"), "docs"]),
|
||||
redoc_url="/" + "/".join([settings.api_prefix.strip("/"), "redocs"]),
|
||||
openapi_url="/" + "/".join([settings.api_prefix.strip("/"), "openapi.json"]),
|
||||
default_response_class=UJSONResponse,
|
||||
)
|
||||
self.app.state.settings = settings
|
||||
self.app.state.queue = BotQueue(bot_app=bot_app)
|
||||
self.bot_app = bot_app
|
||||
|
||||
self.app.include_router(api_router)
|
||||
self.configure_hooks()
|
||||
|
||||
if settings.SENTRY_DSN is not None:
|
||||
sentry_sdk.init(
|
||||
dsn=settings.SENTRY_DSN, # type: ignore[arg-type]
|
||||
environment=settings.DEPLOY_ENVIRONMENT,
|
||||
traces_sample_rate=settings.SENTRY_TRACES_SAMPLE_RATE,
|
||||
send_client_reports=False,
|
||||
)
|
||||
|
||||
@cached_property
|
||||
def fastapi_app(self) -> FastAPI:
|
||||
return self.app
|
||||
|
||||
def configure_hooks(self) -> None:
|
||||
if self.bot_app.start_with_webhook:
|
||||
self.app.add_event_handler("startup", self._on_start_up)
|
||||
else:
|
||||
self.app.add_event_handler("startup", self.bot_app.polling)
|
||||
|
||||
self.app.add_event_handler("shutdown", self._on_shutdown)
|
||||
|
||||
async def _on_start_up(self) -> None:
|
||||
await self.bot_app.set_webhook()
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(self.app.state.queue.get_updates_from_queue())
|
||||
|
||||
async def _on_shutdown(self) -> None:
|
||||
await asyncio.gather(self.bot_app.delete_webhook(), self.bot_app.shutdown())
|
||||
|
||||
|
||||
def create_app(settings: AppSettings | None = None) -> FastAPI:
|
||||
settings = settings or get_settings()
|
||||
bot_app = BotApplication(settings=settings, handlers=command_handlers.handlers)
|
||||
|
||||
return Application(settings=settings, bot_app=bot_app).fastapi_app
|
||||
|
||||
|
||||
def main() -> None:
|
||||
import uvicorn
|
||||
|
||||
app = create_app() # noqa: NEW100
|
||||
|
||||
"""Entrypoint of the application."""
|
||||
uvicorn.run(
|
||||
"main:create_app",
|
||||
workers=app.state.settings.WORKERS_COUNT,
|
||||
host=app.state.settings.APP_HOST,
|
||||
port=app.state.settings.APP_PORT,
|
||||
reload=app.state.settings.RELOAD,
|
||||
factory=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
16
bot_microservice/routers.py
Normal file
16
bot_microservice/routers.py
Normal file
@@ -0,0 +1,16 @@
|
||||
from api.bot.controllers import router as bot_router
|
||||
from api.system.controllers import router as system_router
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import ORJSONResponse
|
||||
from settings.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
api_router = APIRouter(
|
||||
prefix=settings.api_prefix,
|
||||
default_response_class=ORJSONResponse,
|
||||
)
|
||||
|
||||
|
||||
api_router.include_router(system_router, tags=["system"])
|
||||
api_router.include_router(bot_router, tags=["bot"])
|
||||
22
bot_microservice/settings/.env.ci.runtests
Normal file
22
bot_microservice/settings/.env.ci.runtests
Normal file
@@ -0,0 +1,22 @@
|
||||
STAGE="runtests"
|
||||
|
||||
APP_HOST="0.0.0.0"
|
||||
APP_PORT="8000"
|
||||
|
||||
USER="web"
|
||||
TZ="Europe/Moscow"
|
||||
|
||||
TELEGRAM_API_TOKEN="123456789:AABBCCDDEEFFaabbccddeeff-1234567890"
|
||||
|
||||
# webhook settings
|
||||
DOMAIN="http://localhost"
|
||||
URL_PREFIX=
|
||||
|
||||
# set to true to start with webhook. Else bot will start on polling method
|
||||
START_WITH_WEBHOOK="true"
|
||||
|
||||
# quantity of workers for uvicorn
|
||||
WORKERS_COUNT=1
|
||||
# Enable uvicorn reloading
|
||||
RELOAD="true"
|
||||
DEBUG="true"
|
||||
22
bot_microservice/settings/.env.local.runtests
Normal file
22
bot_microservice/settings/.env.local.runtests
Normal file
@@ -0,0 +1,22 @@
|
||||
STAGE="runtests"
|
||||
|
||||
APP_HOST="0.0.0.0"
|
||||
APP_PORT="8000"
|
||||
|
||||
USER="web"
|
||||
TZ="Europe/Moscow"
|
||||
|
||||
TELEGRAM_API_TOKEN="123456789:AABBCCDDEEFFaabbccddeeff-1234567890"
|
||||
|
||||
# webhook settings
|
||||
DOMAIN="http://localhost"
|
||||
URL_PREFIX=
|
||||
|
||||
# set to true to start with webhook. Else bot will start on polling method
|
||||
START_WITH_WEBHOOK="true"
|
||||
|
||||
# quantity of workers for uvicorn
|
||||
WORKERS_COUNT=1
|
||||
# Enable uvicorn reloading
|
||||
RELOAD="true"
|
||||
DEBUG="true"
|
||||
26
bot_microservice/settings/.env.template
Normal file
26
bot_microservice/settings/.env.template
Normal file
@@ -0,0 +1,26 @@
|
||||
STAGE="dev"
|
||||
|
||||
APP_HOST="0.0.0.0"
|
||||
APP_PORT="8000"
|
||||
|
||||
# SENTRY_DSN=
|
||||
SENTRY_TRACES_SAMPLE_RATE="0.95"
|
||||
|
||||
USER="web"
|
||||
TZ="Europe/Moscow"
|
||||
|
||||
TELEGRAM_API_TOKEN="123456789:AABBCCDDEEFFaabbccddeeff-1234567890"
|
||||
|
||||
# webhook settings
|
||||
DOMAIN="https://mydomain.com"
|
||||
URL_PREFIX="/gpt"
|
||||
|
||||
# set to true to start with webhook. Else bot will start on polling method
|
||||
START_WITH_WEBHOOK="false"
|
||||
|
||||
# quantity of workers for uvicorn
|
||||
WORKERS_COUNT=1
|
||||
# Enable uvicorn reloading
|
||||
RELOAD="true"
|
||||
DEBUG="true"
|
||||
|
||||
0
bot_microservice/settings/__init__.py
Normal file
0
bot_microservice/settings/__init__.py
Normal file
69
bot_microservice/settings/config.py
Normal file
69
bot_microservice/settings/config.py
Normal file
@@ -0,0 +1,69 @@
|
||||
from functools import cached_property
|
||||
from os import environ
|
||||
from pathlib import Path
|
||||
|
||||
from constants import API_PREFIX
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import HttpUrl
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
BASE_DIR = Path(__file__).parent.parent
|
||||
SHARED_DIR = BASE_DIR.resolve().joinpath("shared")
|
||||
SHARED_DIR.mkdir(exist_ok=True)
|
||||
|
||||
SHARED_DIR.joinpath("logs").mkdir(exist_ok=True)
|
||||
DIR_LOGS = SHARED_DIR.joinpath("logs")
|
||||
|
||||
env_path = f"{BASE_DIR}/settings/.env"
|
||||
|
||||
if environ.get("STAGE") == "runtests":
|
||||
if "LOCALTEST" in environ:
|
||||
env_path = f"{BASE_DIR}/settings/.env.local.runtests"
|
||||
else:
|
||||
env_path = f"{BASE_DIR}/settings/.env.ci.runtests"
|
||||
|
||||
load_dotenv(env_path, override=True)
|
||||
|
||||
|
||||
class SentrySettings(BaseSettings):
|
||||
SENTRY_DSN: HttpUrl | None = None
|
||||
DEPLOY_ENVIRONMENT: str | None = None
|
||||
SENTRY_TRACES_SAMPLE_RATE: float = 0.95
|
||||
|
||||
|
||||
class AppSettings(SentrySettings, BaseSettings):
|
||||
"""Application settings."""
|
||||
|
||||
PROJECT_NAME: str = "chat gpt bot"
|
||||
APP_HOST: str = "0.0.0.0"
|
||||
APP_PORT: int = 8000
|
||||
STAGE: str = "dev"
|
||||
DEBUG: bool = False
|
||||
|
||||
TELEGRAM_API_TOKEN: str = "123456789:AABBCCDDEEFFaabbccddeeff-1234567890"
|
||||
# webhook settings
|
||||
START_WITH_WEBHOOK: bool = False
|
||||
DOMAIN: str = "https://localhost"
|
||||
URL_PREFIX: str = ""
|
||||
|
||||
# quantity of workers for uvicorn
|
||||
WORKERS_COUNT: int = 1
|
||||
# Enable uvicorn reloading
|
||||
RELOAD: bool = False
|
||||
|
||||
@cached_property
|
||||
def api_prefix(self) -> str:
|
||||
if self.URL_PREFIX:
|
||||
return "/" + "/".join([self.URL_PREFIX.strip("/"), API_PREFIX.strip("/")])
|
||||
return API_PREFIX
|
||||
|
||||
@cached_property
|
||||
def bot_webhook_url(self) -> str:
|
||||
return "/".join([self.api_prefix, self.TELEGRAM_API_TOKEN])
|
||||
|
||||
class Config:
|
||||
case_sensitive = True
|
||||
|
||||
|
||||
def get_settings() -> AppSettings:
|
||||
return AppSettings()
|
||||
0
bot_microservice/tests/__init__.py
Normal file
0
bot_microservice/tests/__init__.py
Normal file
0
bot_microservice/tests/integration/__init__.py
Normal file
0
bot_microservice/tests/integration/__init__.py
Normal file
0
bot_microservice/tests/integration/bot/__init__.py
Normal file
0
bot_microservice/tests/integration/bot/__init__.py
Normal file
251
bot_microservice/tests/integration/bot/conftest.py
Normal file
251
bot_microservice/tests/integration/bot/conftest.py
Normal file
@@ -0,0 +1,251 @@
|
||||
"""This module contains subclasses of classes from the python-telegram-bot library that
|
||||
modify behavior of the respective parent classes in order to make them easier to use in the
|
||||
pytest framework. A common change is to allow monkeypatching of the class members by not
|
||||
enforcing slots in the subclasses."""
|
||||
import asyncio
|
||||
from asyncio import AbstractEventLoop
|
||||
from datetime import tzinfo
|
||||
from typing import Any, AsyncGenerator
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from core.bot import BotApplication
|
||||
from core.handlers import command_handlers
|
||||
from fastapi import FastAPI
|
||||
from httpx import AsyncClient
|
||||
from main import Application as AppApplication
|
||||
from pytest_asyncio.plugin import SubRequest
|
||||
from settings.config import AppSettings, get_settings
|
||||
from telegram import Bot, User
|
||||
from telegram.ext import Application, ApplicationBuilder, Defaults, ExtBot
|
||||
from tests.integration.bot.networking import NonchalantHttpxRequest
|
||||
from tests.integration.factories.bot import BotInfoFactory
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def test_settings() -> AppSettings:
|
||||
return get_settings()
|
||||
|
||||
|
||||
class PytestExtBot(ExtBot): # type: ignore
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
# Makes it easier to work with the bot in tests
|
||||
self._unfreeze()
|
||||
|
||||
# Here we override get_me for caching because we don't want to call the API repeatedly in tests
|
||||
async def get_me(self, *args: Any, **kwargs: Any) -> User:
|
||||
return await _mocked_get_me(self)
|
||||
|
||||
|
||||
class PytestBot(Bot):
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
# Makes it easier to work with the bot in tests
|
||||
self._unfreeze()
|
||||
|
||||
# Here we override get_me for caching because we don't want to call the API repeatedly in tests
|
||||
async def get_me(self, *args: Any, **kwargs: Any) -> User:
|
||||
return await _mocked_get_me(self)
|
||||
|
||||
|
||||
class PytestApplication(Application): # type: ignore
|
||||
pass
|
||||
|
||||
|
||||
def make_bot(bot_info: dict[str, Any] | None = None, **kwargs: Any) -> PytestExtBot:
|
||||
"""
|
||||
Tests are executed on tg.ext.ExtBot, as that class only extends the functionality of tg.bot
|
||||
"""
|
||||
token = kwargs.pop("token", (bot_info or {}).get("token"))
|
||||
kwargs.pop("token", None)
|
||||
return PytestExtBot(
|
||||
token=token,
|
||||
private_key=None,
|
||||
request=NonchalantHttpxRequest(connection_pool_size=8),
|
||||
get_updates_request=NonchalantHttpxRequest(connection_pool_size=1),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
async def _mocked_get_me(bot: Bot) -> User:
|
||||
if bot._bot_user is None:
|
||||
bot._bot_user = _get_bot_user(bot.token)
|
||||
return bot._bot_user
|
||||
|
||||
|
||||
def _get_bot_user(token: str) -> User:
|
||||
"""Used to return a mock user in bot.get_me(). This saves API calls on every init."""
|
||||
bot_info = BotInfoFactory()
|
||||
# We don't take token from bot_info, because we need to make a bot with a specific ID. So we
|
||||
# generate the correct user_id from the token (token from bot_info is random each test run).
|
||||
# This is important in e.g. bot equality tests. The other parameters like first_name don't
|
||||
# matter as much. In the future we may provide a way to get all the correct info from the token
|
||||
user_id = int(token.split(":")[0])
|
||||
first_name = bot_info.get(
|
||||
"name",
|
||||
)
|
||||
username = bot_info.get(
|
||||
"username",
|
||||
).strip("@")
|
||||
return User(
|
||||
user_id,
|
||||
first_name,
|
||||
is_bot=True,
|
||||
username=username,
|
||||
can_join_groups=True,
|
||||
can_read_all_group_messages=False,
|
||||
supports_inline_queries=True,
|
||||
)
|
||||
|
||||
|
||||
# Redefine the event_loop fixture to have a session scope. Otherwise `bot` fixture can't be
|
||||
# session. See https://github.com/pytest-dev/pytest-asyncio/issues/68 for more details.
|
||||
@pytest.fixture(scope="session")
|
||||
def event_loop(request: SubRequest) -> AbstractEventLoop:
|
||||
"""
|
||||
Пересоздаем луп для изоляции тестов. В основном нужно для запуска юнит тестов
|
||||
в связке с интеграционными, т.к. без этого pytest зависает.
|
||||
Для интеграционных тестов фикстура определяется дополнительная фикстура на всю сессию.
|
||||
"""
|
||||
loop = asyncio.get_event_loop_policy().new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
return loop
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def bot_info() -> dict[str, Any]:
|
||||
return BotInfoFactory()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
async def bot_application(bot_info: dict[str, Any]) -> AsyncGenerator[Any, None]:
|
||||
# We build a new bot each time so that we use `app` in a context manager without problems
|
||||
application = ApplicationBuilder().bot(make_bot(bot_info)).application_class(PytestApplication).build()
|
||||
yield application
|
||||
if application.running:
|
||||
await application.stop()
|
||||
await application.shutdown()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
async def bot(bot_info: dict[str, Any], bot_application: Any) -> AsyncGenerator[PytestExtBot, None]:
|
||||
"""Makes an ExtBot instance with the given bot_info"""
|
||||
async with make_bot(bot_info) as _bot:
|
||||
_bot.application = bot_application
|
||||
yield _bot
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def one_time_bot(bot_info: dict[str, Any], bot_application: Any) -> PytestExtBot:
|
||||
"""A function scoped bot since the session bot would shutdown when `async with app` finishes"""
|
||||
bot = make_bot(bot_info)
|
||||
bot.application = bot_application
|
||||
return bot
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
async def cdc_bot(bot_info: dict[str, Any], bot_application: Any) -> AsyncGenerator[PytestExtBot, None]:
|
||||
"""Makes an ExtBot instance with the given bot_info that uses arbitrary callback_data"""
|
||||
async with make_bot(bot_info, arbitrary_callback_data=True) as _bot:
|
||||
_bot.application = bot_application
|
||||
yield _bot
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
async def raw_bot(bot_info: dict[str, Any], bot_application: Any) -> AsyncGenerator[PytestBot, None]:
|
||||
"""Makes an regular Bot instance with the given bot_info"""
|
||||
async with PytestBot(
|
||||
bot_info["token"],
|
||||
private_key=None,
|
||||
request=NonchalantHttpxRequest(8),
|
||||
get_updates_request=NonchalantHttpxRequest(1),
|
||||
) as _bot:
|
||||
_bot.application = bot_application
|
||||
yield _bot
|
||||
|
||||
|
||||
# Here we store the default bots so that we don't have to create them again and again.
|
||||
# They are initialized but not shutdown on pytest_sessionfinish because it is causing
|
||||
# problems with the event loop (Event loop is closed).
|
||||
_default_bots: dict[Defaults, PytestExtBot] = {}
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
async def default_bot(request: SubRequest, bot_info: dict[str, Any]) -> PytestExtBot:
|
||||
param = request.param if hasattr(request, "param") else {}
|
||||
defaults = Defaults(**param)
|
||||
|
||||
# If the bot is already created, return it. Else make a new one.
|
||||
default_bot = _default_bots.get(defaults)
|
||||
if default_bot is None:
|
||||
default_bot = make_bot(bot_info, defaults=defaults)
|
||||
await default_bot.initialize()
|
||||
_default_bots[defaults] = default_bot # Defaults object is hashable
|
||||
return default_bot
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
async def tz_bot(timezone: tzinfo, bot_info: dict[str, Any]) -> PytestExtBot:
|
||||
defaults = Defaults(tzinfo=timezone)
|
||||
try: # If the bot is already created, return it. Saves time since get_me is not called again.
|
||||
return _default_bots[defaults]
|
||||
except KeyError:
|
||||
default_bot = make_bot(bot_info, defaults=defaults)
|
||||
await default_bot.initialize()
|
||||
_default_bots[defaults] = default_bot
|
||||
return default_bot
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def chat_id(bot_info: dict[str, Any]) -> int:
|
||||
return bot_info["chat_id"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def super_group_id(bot_info: dict[str, Any]) -> int:
|
||||
return bot_info["super_group_id"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def forum_group_id(bot_info: dict[str, Any]) -> int:
|
||||
return int(bot_info["forum_group_id"])
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def channel_id(bot_info: dict[str, Any]) -> int:
|
||||
return bot_info["channel_id"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def provider_token(bot_info: dict[str, Any]) -> str:
|
||||
return bot_info["payment_provider_token"]
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session")
|
||||
async def main_application(
|
||||
bot_application: PytestApplication, test_settings: AppSettings
|
||||
) -> AsyncGenerator[FastAPI, None]:
|
||||
bot_app = BotApplication(
|
||||
application=bot_application,
|
||||
settings=test_settings,
|
||||
handlers=command_handlers.handlers,
|
||||
)
|
||||
fast_api_app = AppApplication(settings=test_settings, bot_app=bot_app).fastapi_app
|
||||
yield fast_api_app
|
||||
|
||||
|
||||
@pytest_asyncio.fixture()
|
||||
async def rest_client(
|
||||
main_application: FastAPI,
|
||||
) -> AsyncGenerator[AsyncClient, None]:
|
||||
"""
|
||||
Default http client. Use to test unauthorized requests, public endpoints
|
||||
or special authorization methods.
|
||||
"""
|
||||
async with AsyncClient(
|
||||
app=main_application,
|
||||
base_url="http://test",
|
||||
headers={"Content-Type": "application/json"},
|
||||
) as client:
|
||||
yield client
|
||||
109
bot_microservice/tests/integration/bot/networking.py
Normal file
109
bot_microservice/tests/integration/bot/networking.py
Normal file
@@ -0,0 +1,109 @@
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
import pytest
|
||||
from httpx import AsyncClient, Response
|
||||
from telegram._utils.defaultvalue import DEFAULT_NONE
|
||||
from telegram._utils.types import ODVInput
|
||||
from telegram.error import BadRequest, RetryAfter, TimedOut
|
||||
from telegram.request import HTTPXRequest, RequestData
|
||||
|
||||
|
||||
class NonchalantHttpxRequest(HTTPXRequest):
|
||||
"""This Request class is used in the tests to suppress errors that we don't care about
|
||||
in the test suite.
|
||||
"""
|
||||
|
||||
async def _request_wrapper(
|
||||
self,
|
||||
url: str,
|
||||
method: str,
|
||||
request_data: Optional[RequestData] = None,
|
||||
read_timeout: ODVInput[float] = DEFAULT_NONE,
|
||||
write_timeout: ODVInput[float] = DEFAULT_NONE,
|
||||
connect_timeout: ODVInput[float] = DEFAULT_NONE,
|
||||
pool_timeout: ODVInput[float] = DEFAULT_NONE,
|
||||
) -> bytes:
|
||||
try:
|
||||
return await super()._request_wrapper(
|
||||
method=method,
|
||||
url=url,
|
||||
request_data=request_data,
|
||||
read_timeout=read_timeout,
|
||||
write_timeout=write_timeout,
|
||||
connect_timeout=connect_timeout,
|
||||
pool_timeout=pool_timeout,
|
||||
)
|
||||
except RetryAfter as e:
|
||||
pytest.xfail(f"Not waiting for flood control: {e}")
|
||||
except TimedOut as e:
|
||||
pytest.xfail(f"Ignoring TimedOut error: {e}")
|
||||
|
||||
|
||||
async def expect_bad_request(func: Callable[..., Any], message: str, reason: str) -> Callable[..., Any]:
|
||||
"""
|
||||
Wrapper for testing bot functions expected to result in an :class:`telegram.error.BadRequest`.
|
||||
Makes it XFAIL, if the specified error message is present.
|
||||
|
||||
Args:
|
||||
func: The awaitable to be executed.
|
||||
message: The expected message of the bad request error. If another message is present,
|
||||
the error will be reraised.
|
||||
reason: Explanation for the XFAIL.
|
||||
|
||||
Returns:
|
||||
On success, returns the return value of :attr:`func`
|
||||
"""
|
||||
try:
|
||||
return await func()
|
||||
except BadRequest as e:
|
||||
if message in str(e):
|
||||
pytest.xfail(f"{reason}. {e}")
|
||||
else:
|
||||
raise e
|
||||
|
||||
|
||||
async def send_webhook_message(
|
||||
ip: str,
|
||||
port: int,
|
||||
payload_str: str | None,
|
||||
url_path: str = "",
|
||||
content_len: int | None = -1,
|
||||
content_type: str = "application/json",
|
||||
get_method: str | None = None,
|
||||
secret_token: str | None = None,
|
||||
) -> Response:
|
||||
headers = {
|
||||
"content-type": content_type,
|
||||
}
|
||||
if secret_token:
|
||||
headers["X-Telegram-Bot-Api-Secret-Token"] = secret_token
|
||||
|
||||
if not payload_str:
|
||||
content_len = None
|
||||
payload = None
|
||||
else:
|
||||
payload = bytes(payload_str, encoding="utf-8")
|
||||
|
||||
if content_len == -1:
|
||||
content_len = len(payload) if payload else None
|
||||
|
||||
if content_len is not None:
|
||||
headers["content-length"] = str(content_len)
|
||||
|
||||
url = f"http://{ip}:{port}/{url_path}"
|
||||
|
||||
async with AsyncClient() as client:
|
||||
return await client.request(
|
||||
url=url,
|
||||
method=get_method or "POST",
|
||||
data=payload, # type: ignore
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
|
||||
class MockedRequest:
|
||||
def __init__(self, data: dict[str, Any]) -> None:
|
||||
self.data = data
|
||||
|
||||
async def json(self) -> dict[str, Any]:
|
||||
return self.data
|
||||
71
bot_microservice/tests/integration/bot/test_bot_updates.py
Normal file
71
bot_microservice/tests/integration/bot/test_bot_updates.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import asyncio
|
||||
import time
|
||||
from asyncio import AbstractEventLoop
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from assertpy import assert_that
|
||||
from core.bot import BotApplication, BotQueue
|
||||
from faker import Faker
|
||||
from httpx import AsyncClient
|
||||
from main import Application
|
||||
from tests.integration.bot.networking import MockedRequest
|
||||
from tests.integration.factories.bot import (
|
||||
BotChatFactory,
|
||||
BotEntitleFactory,
|
||||
BotUserFactory,
|
||||
)
|
||||
|
||||
pytestmark = [
|
||||
pytest.mark.asyncio,
|
||||
]
|
||||
|
||||
|
||||
faker = Faker()
|
||||
|
||||
|
||||
async def test_bot_updates(rest_client: AsyncClient) -> None:
|
||||
response = await rest_client.get("/api/healthcheck")
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
async def test_bot_webhook_endpoint(
|
||||
rest_client: AsyncClient,
|
||||
main_application: Application,
|
||||
) -> None:
|
||||
bot_update = create_bot_update()
|
||||
response = await rest_client.post(url="/api/123456789:AABBCCDDEEFFaabbccddeeff-1234567890", json=bot_update)
|
||||
assert response.status_code == 202
|
||||
update = await main_application.state._state["queue"].queue.get() # type: ignore[attr-defined]
|
||||
update = update.to_dict()
|
||||
assert update["update_id"] == bot_update["update_id"]
|
||||
assert_that(update["message"]).is_equal_to(
|
||||
bot_update["message"], include=["from", "entities", "message_id", "text"]
|
||||
)
|
||||
|
||||
|
||||
async def test_bot_queue(
|
||||
bot: BotApplication,
|
||||
event_loop: AbstractEventLoop,
|
||||
) -> None:
|
||||
bot_queue = BotQueue(bot_app=bot)
|
||||
event_loop.create_task(bot_queue.get_updates_from_queue())
|
||||
bot_update = create_bot_update()
|
||||
mocked_request = MockedRequest(bot_update)
|
||||
await bot_queue.put_updates_on_queue(mocked_request) # type: ignore
|
||||
await asyncio.sleep(1)
|
||||
assert bot_queue.queue.empty()
|
||||
|
||||
|
||||
def create_bot_update() -> dict[str, Any]:
|
||||
bot_update: dict[str, Any] = {}
|
||||
bot_update["update_id"] = faker.random_int(min=10**8, max=10**9 - 1)
|
||||
bot_update["message"] = {
|
||||
"message_id": faker.random_int(min=10**8, max=10**9 - 1),
|
||||
"from": BotUserFactory()._asdict(),
|
||||
"chat": BotChatFactory()._asdict(),
|
||||
"date": time.time(),
|
||||
"text": "/chatid",
|
||||
"entities": [BotEntitleFactory()],
|
||||
}
|
||||
return bot_update
|
||||
56
bot_microservice/tests/integration/factories/bot.py
Normal file
56
bot_microservice/tests/integration/factories/bot.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import string
|
||||
|
||||
import factory
|
||||
from faker import Faker
|
||||
from tests.integration.factories.models import Chat, User
|
||||
|
||||
faker = Faker("ru_RU")
|
||||
|
||||
|
||||
class BotUserFactory(factory.Factory):
|
||||
id = factory.Sequence(lambda n: 1000 + n)
|
||||
is_bot = False
|
||||
first_name = factory.Faker("first_name")
|
||||
last_name = factory.Faker("last_name")
|
||||
username = faker.profile(fields=["username"])["username"]
|
||||
language_code = "ru"
|
||||
|
||||
class Meta:
|
||||
model = User
|
||||
|
||||
|
||||
class BotChatFactory(factory.Factory):
|
||||
id = factory.Sequence(lambda n: 1 + n)
|
||||
first_name = factory.Faker("first_name")
|
||||
last_name = factory.Faker("last_name")
|
||||
username = faker.profile(fields=["username"])["username"]
|
||||
type = "private"
|
||||
|
||||
class Meta:
|
||||
model = Chat
|
||||
|
||||
|
||||
class BotInfoFactory(factory.DictFactory):
|
||||
token = factory.Faker(
|
||||
"bothify", text="#########:??????????????????????????-#????????#?", letters=string.ascii_letters
|
||||
) # example: 579694714:AAFpK8w6zkkUrD4xSeYwF3MO8e-4Grmcy7c
|
||||
payment_provider_token = factory.Faker(
|
||||
"bothify", text="#########:TEST:????????????????", letters=string.ascii_letters
|
||||
) # example: 579694714:TEST:K8w6zkkUrD4xSeYw
|
||||
chat_id = factory.Faker("random_int", min=10**8, max=10**9 - 1)
|
||||
super_group_id = factory.Faker("random_int", min=-(10**12) - 10**9, max=-(10**12)) # -1001838004577
|
||||
forum_group_id = factory.Faker("random_int", min=-(10**12) - 10**9, max=-(10**12))
|
||||
channel_name = factory.Faker("name")
|
||||
channel_id = factory.LazyAttribute(lambda obj: f"@{obj.channel_name}")
|
||||
name = factory.Faker("name")
|
||||
fake_username = factory.Faker("name")
|
||||
username = factory.LazyAttribute(lambda obj: "_".join(f"@{obj.fake_username}".split(" "))) # @Peter_Parker
|
||||
|
||||
class Meta:
|
||||
exclude = ("channel_name", "fake_username")
|
||||
|
||||
|
||||
class BotEntitleFactory(factory.DictFactory):
|
||||
type = "bot_command"
|
||||
offset = 0
|
||||
length = 7
|
||||
18
bot_microservice/tests/integration/factories/models.py
Normal file
18
bot_microservice/tests/integration/factories/models.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from typing import NamedTuple
|
||||
|
||||
|
||||
class User(NamedTuple):
|
||||
id: int
|
||||
is_bot: bool
|
||||
first_name: str | None
|
||||
last_name: str | None
|
||||
username: str | None
|
||||
language_code: str
|
||||
|
||||
|
||||
class Chat(NamedTuple):
|
||||
id: int
|
||||
first_name: str | None
|
||||
last_name: str | None
|
||||
username: str
|
||||
type: str
|
||||
Reference in New Issue
Block a user