Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: User endpoints #3

Merged
merged 4 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 35 additions & 68 deletions internal/api/login.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
from typing import List
from fastapi import APIRouter, Response, Request
from fastapi import APIRouter, Response
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from internal.models.user import User
from internal.types.responses import SuccessResponse, FailResponse, UserResponse
from internal.tokens.tokens import verify_jwt_token, create_access_token, create_refresh_token
from internal.models.user import User, PublicUser
from internal.types.responses import SuccessResponse, FailResponse, PublicUserResponse
from internal.tokens.tokens import create_access_token, create_refresh_token
from internal.database.users import USER_DB
from internal.types.types import (
LoginRequest, RegisterRequest, RefreshTokenRequest,
UserRequest, FAIL, SUCCESS
LoginRequest, UserRequest,
FAIL, SUCCESS
)

router = APIRouter()

USER_DB: List[User] = []


@router.post("/register", response_model=UserResponse)
def register_user(request: RegisterRequest):
@router.post("/register", response_model=PublicUserResponse)
def register_user(request: UserRequest, response: Response):
global USER_DB
for user in USER_DB:
if request.email == user.email:
Expand All @@ -29,22 +27,35 @@ def register_user(request: RegisterRequest):
)

user = User(
username=request.name,
username=request.username,
email=request.email,
password=request.password,
role="user",
)
user.refresh_token = create_refresh_token(user.id)

USER_DB.append(user)
return UserResponse(

access_token = create_access_token(user.id)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=True,
samesite="None"
)

return PublicUserResponse(
code=SUCCESS,
message="User registered successfully",
user=user
user=PublicUser(
id=user.id,
username=user.username,
email=user.email,
role=user.role
)
)


@router.post("/login", response_model=UserResponse)
@router.post("/login", response_model=PublicUserResponse)
def login_user(request: LoginRequest, response: Response):
user = next((u for u in USER_DB if u.email == request.email), None)

Expand All @@ -68,7 +79,6 @@ def login_user(request: LoginRequest, response: Response):

if request.remember_me:
refresh_token = create_refresh_token(user.id)
user.refresh_token = refresh_token

response.set_cookie(
key="refresh_token",
Expand All @@ -78,63 +88,20 @@ def login_user(request: LoginRequest, response: Response):
samesite="None"
)

return UserResponse(
return PublicUserResponse(
code=SUCCESS,
message="Login successful",
user=user
)


@router.post("/refresh-token", response_model=SuccessResponse)
def refresh_token(request: Request, body: RefreshTokenRequest, response: Response):
user = body.user

# If the session is still active, allow silent refresh
if user:
new_access_token = create_access_token(user.id)
response.set_cookie(
key="access_token",
value=new_access_token,
httponly=True,
secure=True,
samesite="None"
user=PublicUser(
id=user.id,
username=user.username,
email=user.email,
role=user.role
)

return SuccessResponse(
code=SUCCESS,
message="Token refreshed"
)

# Else, check if any refresh token exists
refresh_token = request.cookies.get("refresh_token")
decoded_refresh = verify_jwt_token(refresh_token) if refresh_token else None
if decoded_refresh:
if user and user.refresh_token == refresh_token:
new_access_token = create_access_token(user.id)
response.set_cookie(
key="access_token",
value=new_access_token,
httponly=True,
secure=True,
samesite="None"
)
return SuccessResponse(code=SUCCESS, message="Token refreshed")

# If there is no refresh token, require authentication
return JSONResponse(status_code=401, content=jsonable_encoder(
FailResponse(
code=FAIL,
message="Authentication required"
))
)


@router.post("/logout", response_model=SuccessResponse)
def logout_user(request: UserRequest, response: Response):
user = next((u for u in USER_DB if u.email == request.email), None)
if user:
user.refresh_token = None

def logout_user(response: Response):
response.delete_cookie("access_token")
response.delete_cookie("refresh_token")

Expand Down
68 changes: 68 additions & 0 deletions internal/api/token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from fastapi import APIRouter, Request, Response
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from internal.tokens.tokens import verify_jwt_token, create_access_token
from internal.database.users import USER_DB
from internal.types.responses import SuccessResponse, FailResponse
from internal.types.types import FAIL, SUCCESS, UserRequest

router = APIRouter()


@router.post("/refresh-token", response_model=SuccessResponse)
async def refresh_token(request: Request, response: Response, body: UserRequest | None = None):
# 1) Check if there is valid access_token
present_access_token = request.cookies.get("access_token")
decoded_access = verify_jwt_token(present_access_token) if present_access_token else None

if decoded_access:
# If there is a valid access_token, do not refresh and return
user_id = decoded_access.get("id")
u = next((u for u in USER_DB if u.id == user_id), None)
if u:
return SuccessResponse(code=SUCCESS, message="There is already valid access_token exists")

# 2) Try to get user from body
user = None
try:
body = await request.json()
user = body.get("user", None)
except Exception:
pass

if user:
# If there is a valid user in body, refresh the access token
u = next((u for u in USER_DB if u.id == user.get("id")), None)
if u:
new_access_token = create_access_token(u.id)
response.set_cookie(
key="access_token",
value=new_access_token,
httponly=True,
secure=True,
samesite="None"
)
return SuccessResponse(code=SUCCESS, message="Token refreshed")

# 3) If no user is in the request body, check for refresh_token
refresh_token = request.cookies.get("refresh_token")
decoded_refresh = verify_jwt_token(refresh_token) if refresh_token else None

if decoded_refresh:
# If there is a valid refresh token, refresh the access token
user_id = decoded_refresh.get("id")
u = next((u for u in USER_DB if u.id == user_id), None)
if u:
new_access_token = create_access_token(u.id)
response.set_cookie(
key="access_token",
value=new_access_token,
httponly=True,
secure=True,
samesite="None"
)
return SuccessResponse(code=SUCCESS, message="Token refreshed")

return JSONResponse(status_code=401, content=jsonable_encoder(
FailResponse(code=FAIL, message="Authentication required"))
)
81 changes: 81 additions & 0 deletions internal/api/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import jwt
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from config.config import get_config
from internal.utils.utils import hash_password
from internal.database.users import USER_DB
from internal.types.responses import FailResponse, PublicUserResponse
from internal.types.types import SUCCESS, FAIL, UserUpdateRequest
from internal.models.user import User, PublicUser

router = APIRouter()


def get_current_user(request: Request):
# Check for access_token cookie
access_token = request.cookies.get("access_token")
if not access_token:
raise HTTPException(status_code=401, detail="Authentication required")

try:
payload = jwt.decode(access_token, get_config("secret_key"), algorithms=[get_config("algorithm")])
user_id = payload.get("id")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")

# Fetch user from database
user = next((u for u in USER_DB if u.id == user_id), None)
if not user:
raise HTTPException(status_code=404, detail="User not found")

return user

except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")


@router.get("/me", response_model=PublicUserResponse)
def get_current_user_info(user: User = Depends(get_current_user)):
return PublicUserResponse(
code=SUCCESS,
message="User details retrieved successfully",
user=PublicUser(
id=user.id,
username=user.username,
email=user.email,
role=user.role
)
)


@router.patch("/me", response_model=PublicUserResponse)
def update_user_info(update_data: UserUpdateRequest, user: User = Depends(get_current_user)):
existing_user = next((u for u in USER_DB if u.id == user.id), None)
if not existing_user:
return JSONResponse(status_code=404, content=jsonable_encoder(
FailResponse(
code=FAIL,
message="User not found"
)
))

if update_data.username:
existing_user.username = update_data.username
if update_data.email:
existing_user.email = update_data.email
if update_data.password:
existing_user.password = hash_password(update_data.password)

return PublicUserResponse(
code=SUCCESS,
message="User details updated successfully",
user=PublicUser(
id=user.id,
username=user.username,
email=user.email,
role=user.role
)
)
4 changes: 4 additions & 0 deletions internal/database/users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from typing import List
from internal.models.user import User

USER_DB: List[User] = []
16 changes: 11 additions & 5 deletions internal/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class User(BaseModel):
email: str
password: str
role: str
refresh_token: str = ""

@field_validator("password", mode="before")
@classmethod
Expand All @@ -18,7 +17,14 @@ def hash_password(cls, value: str) -> str:
def check_password(self, plain_password: str) -> bool:
return verify_password(plain_password, self.password)

def to_dict(self):
user_data = self.model_dump()
del user_data["password"]
return user_data
def __setattr__(self, name, value):
if name == "id" and hasattr(self, "id"):
raise AttributeError("The 'id' field is immutable and cannot be changed after creation.")
super().__setattr__(name, value)


class PublicUser(BaseModel):
id: str
username: str
email: str
role: str
13 changes: 8 additions & 5 deletions internal/types/responses.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pydantic import BaseModel, Field
from internal.models.user import User
from internal.models.user import PublicUser


class SuccessResponse(BaseModel):
Expand All @@ -16,13 +16,16 @@ class VersionResponse(SuccessResponse):
version: str = Field(None, examples=["v0.1.0"])


class UserResponse(SuccessResponse):
user: User = Field(None, examples=[{
class PublicUserResponse(SuccessResponse):
user: PublicUser = Field(None, examples=[{
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"username": "test",
"email": "test@example.com",
"password": "test123",
"role": "user",
"refresh_token": ""
}]
)


class TokenResponse(SuccessResponse):
access_token: str = Field(None, examples=["eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."])
token_type: str = Field(None, examples=["bearer"])
Loading