2025-07-27 20:09:30 +02:00

63 lines
2.0 KiB
Python

import time
import jwt
from fastapi import HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.config import settings as global_settings
from app.models.user import User
from rotoger import AppStructLogger
logger = AppStructLogger().get_logger()
async def get_from_redis(request: Request, key: str):
return await request.app.redis.get(key)
async def set_to_redis(request: Request, key: str, value: str, ex: int):
return await request.app.redis.set(key, value, ex=ex)
async def verify_jwt(request: Request, token: str) -> bool:
payload = await get_from_redis(request, token)
return bool(payload)
class AuthBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super().__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super().__call__(request)
if not credentials:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
if credentials.scheme != "Bearer":
raise HTTPException(
status_code=403, detail="Invalid authentication scheme."
)
if not await verify_jwt(request, credentials.credentials):
raise HTTPException(
status_code=403, detail="Invalid token or expired token."
)
await logger.ainfo(f"Token verified: {credentials.credentials}")
return credentials.credentials
async def create_access_token(user: User, request: Request):
# sourcery skip: avoid-builtin-shadow
payload = {
"email": user.email,
"expiry": time.time() + global_settings.jwt_expire,
"platform": request.headers.get("User-Agent"),
}
token = jwt.encode(
payload, str(user.password), algorithm=global_settings.jwt_algorithm
)
_bool = await set_to_redis(
request, token, str(payload), ex=global_settings.jwt_expire
)
if _bool:
return token