Jakub Miazek 4e2d3026c8 refactor
2023-10-24 20:28:30 +02:00

42 lines
1.5 KiB
Python

import time
import jwt
from app.config import settings as global_settings
from app.models.user import User
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
async def verify_jwt(request: Request, token: str) -> bool:
_payload = await request.app.state.redis.get(token)
return bool(_payload)
class AuthBearer(HTTPBearer):
def __init__(self, auto_error: bool = True):
super().__init__(auto_error=auto_error)
async def __call__(self, request: Request):
credentials: HTTPAuthorizationCredentials = await super().__call__(request)
if not credentials:
raise HTTPException(status_code=403, detail="Invalid authorization code.")
if credentials.scheme != "Bearer":
raise HTTPException(status_code=403, detail="Invalid authentication scheme.")
if not await verify_jwt(request, credentials.credentials):
raise HTTPException(status_code=403, detail="Invalid token or expired token.")
return credentials.credentials
async def create_access_token(user: User, request: Request):
_payload = {
"email": user.email,
"expiry": time.time() + global_settings.jwt_expire,
"platform": request.headers.get("User-Agent"),
}
_token = jwt.encode(_payload, str(user.password), algorithm=global_settings.jwt_algorithm)
_bool = await request.app.state.redis.set(_token, str(_payload), ex=global_settings.jwt_expire)
if _bool:
return _token