diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/auth.py b/app/services/auth.py new file mode 100644 index 0000000..d71e40e --- /dev/null +++ b/app/services/auth.py @@ -0,0 +1,47 @@ +import time +import jwt + +from app import config +from app.models.user import User + +from fastapi import Request, HTTPException +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials + +global_settings = config.get_settings() + + +async def verify_jwt(request: Request, token: str) -> bool: + _payload = await request.app.state.redis.get(token) + if _payload: + return True + else: + return False + + +class AuthBearer(HTTPBearer): + def __init__(self, auto_error: bool = True): + super(AuthBearer, self).__init__(auto_error=auto_error) + + async def __call__(self, request: Request): + credentials: HTTPAuthorizationCredentials = await super(AuthBearer, self).__call__(request) + if credentials: + if not credentials.scheme == "Bearer": + raise HTTPException(status_code=403, detail="Invalid authentication scheme.") + if not await verify_jwt(request, credentials.credentials): + raise HTTPException(status_code=403, detail="Invalid token or expired token.") + return credentials.credentials + else: + raise HTTPException(status_code=403, detail="Invalid authorization code.") + + +async def create_access_token(user: User, request: Request): + _payload = { + "email": user.email, + "expiry": time.time() + global_settings.jwt_expire, + "platform": request.headers.get("User-Agent") + } + _token = jwt.encode(_payload, str(user.password), algorithm=global_settings.jwt_algorithm) + + _bool = await request.app.state.redis.set(_token, str(_payload), ex=global_settings.jwt_expire) + if _bool: + return _token