Skip to content

Commit e25d744

Browse files
committed
Apply responsability segregation in user route and jwt_manager
1 parent 5d190ff commit e25d744

File tree

2 files changed

+50
-42
lines changed

2 files changed

+50
-42
lines changed

routers/user.py

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,40 @@
1-
from fastapi import status,APIRouter
1+
from fastapi import status, APIRouter
22
from fastapi.responses import JSONResponse
33
from utils.jwt_manager import create_token
4-
from schemas.user import User,UserBase,UserCreate
4+
from schemas.user import User, UserBase, UserCreate
55
from config.database import Session
66
from services.user import UserService
77
from services.auth import Auth
88

99
user_router = APIRouter()
10+
db = Session()
11+
12+
13+
@user_router.post('/users', tags=['Auth'], response_model=User, status_code=status.HTTP_200_OK)
14+
def create_user(user: UserCreate):
15+
check_user_exists(user)
1016

11-
@user_router.post('/users',tags=['Auth'],response_model=User,status_code=status.HTTP_200_OK)
12-
def create_user(user:UserCreate):
13-
14-
db = Session()
15-
16-
result = UserService(db).get_user_by_email(email=user.email)
17-
18-
if result:
19-
20-
return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST,content={"message":"User already exists"})
21-
2217
UserService(db).create_user(user)
23-
24-
return JSONResponse(status_code=status.HTTP_200_OK,content={"message":"User created"})
25-
26-
27-
@user_router.post('/login',tags=['Auth'],status_code=status.HTTP_200_OK)
28-
def login(user:UserCreate):
29-
30-
db = Session()
31-
result = UserService(db).get_user_by_email(email=user.email)
32-
33-
if not (result and Auth().verify_password(user.password,result.password)):
34-
35-
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED,content={"message":"Unauthorized"})
36-
37-
token:str = create_token(user.dict())
38-
39-
return JSONResponse(status_code=status.HTTP_200_OK,content=token)
18+
19+
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "User created"})
20+
21+
22+
def check_user_exists(user):
23+
if UserService(db).get_user_by_email(email=user.email):
24+
return JSONResponse(status_code=status.HTTP_400_BAD_REQUEST, content={"message": "User already exists"})
25+
26+
27+
@user_router.post('/login', tags=['Auth'], status_code=status.HTTP_200_OK)
28+
def login(user: UserCreate):
29+
validate_password(user)
30+
31+
token: str = create_token(user.dict())
32+
33+
return JSONResponse(status_code=status.HTTP_200_OK, content=token)
34+
35+
36+
def validate_password(user):
37+
user_found = UserService(db).get_user_by_email(email=user.email)
38+
39+
if not (user_found and Auth().verify_password(user.password, user_found.password)):
40+
return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"message": "Unauthorized"})

utils/jwt_manager.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
1-
from jwt import encode,decode
2-
from datetime import datetime,timedelta
1+
from jwt import encode, decode
2+
from datetime import datetime, timedelta
33
from utils.settings import Settings
44

55
settings = Settings()
66

7-
def create_token(data:dict) -> dict:
8-
payload = expire_token(data)
9-
token:str = encode(payload,key=settings.MY_SECRET_KEY,algorithm="HS256")
7+
8+
def create_token(data: dict) -> dict:
9+
payload = encode_payload(data)
10+
11+
token: str = encode(payload, key=settings.MY_SECRET_KEY, algorithm="HS256")
1012
return token
1113

12-
def validate_token(token:str) -> dict:
13-
data:dict = decode(token,key=settings.MY_SECRET_KEY,algorithms=["HS256"])
14+
15+
def validate_token(token: str) -> dict:
16+
data: dict = decode(token, key=settings.MY_SECRET_KEY, algorithms=["HS256"])
1417
return data
1518

16-
def expire_token(data:dict):
19+
20+
def calculate_token_expiration():
21+
return datetime.utcnow() + timedelta(minutes=settings.TOKEN_EXPIRE_MINUTES)
22+
23+
24+
def encode_payload(data: dict):
1725
to_encode = data.copy()
18-
token_expires = timedelta(minutes=settings.TOKEN_EXPIRE_MINUTES)
19-
expire = datetime.utcnow() + token_expires
20-
to_encode.update({'exp':expire})
21-
return to_encode
26+
to_encode['exp'] = calculate_token_expiration()
27+
28+
return to_encode

0 commit comments

Comments
 (0)