-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtoken.py
68 lines (59 loc) · 2.62 KB
/
token.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from fastapi import APIRouter, Request, Response
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from internal.tokens.tokens import verify_jwt_token, create_access_token
from internal.database.users import USER_DB
from internal.types.responses import SuccessResponse, FailResponse
from internal.types.types import FAIL, SUCCESS, UserRequest
router = APIRouter()
@router.post("/refresh-token", response_model=SuccessResponse)
async def refresh_token(request: Request, response: Response, body: UserRequest | None = None):
# 1) Check if there is valid access_token
present_access_token = request.cookies.get("access_token")
decoded_access = verify_jwt_token(present_access_token) if present_access_token else None
if decoded_access:
# If there is a valid access_token, do not refresh and return
user_id = decoded_access.get("id")
u = next((u for u in USER_DB if u.id == user_id), None)
if u:
return SuccessResponse(code=SUCCESS, message="There is already valid access_token exists")
# 2) Try to get user from body
user = None
try:
body = await request.json()
user = body.get("user", None)
except Exception:
pass
if user:
# If there is a valid user in body, refresh the access token
u = next((u for u in USER_DB if u.id == user.get("id")), None)
if u:
new_access_token = create_access_token(u.id)
response.set_cookie(
key="access_token",
value=new_access_token,
httponly=True,
secure=True,
samesite="None"
)
return SuccessResponse(code=SUCCESS, message="Token refreshed")
# 3) If no user is in the request body, check for refresh_token
refresh_token = request.cookies.get("refresh_token")
decoded_refresh = verify_jwt_token(refresh_token) if refresh_token else None
if decoded_refresh:
# If there is a valid refresh token, refresh the access token
user_id = decoded_refresh.get("id")
u = next((u for u in USER_DB if u.id == user_id), None)
if u:
new_access_token = create_access_token(u.id)
response.set_cookie(
key="access_token",
value=new_access_token,
httponly=True,
secure=True,
samesite="None"
)
return SuccessResponse(code=SUCCESS, message="Token refreshed")
return JSONResponse(status_code=401, content=jsonable_encoder(
FailResponse(code=FAIL, message="Authentication required"))
)