mirror of
https://github.com/kevin-DL/full-stack-fastapi-postgresql.git
synced 2026-01-13 18:45:27 +00:00
* ♻️ Refactor and simplify backend code * ♻️ Refactor frontend state, integrate typesafe-vuex accessors into state files * ♻️ Use new state accessors and standardize layout * 🔒 Upgrade and fix npm security audit * 🔧 Update local re-generation scripts * 🔊 Log startup exceptions to detect errors early * ✏️ Fix password reset token content * 🔥 Remove unneeded Dockerfile directives * 🔥 Remove unnecessary print * 🔥 Remove unnecessary code, upgrade dependencies in backend * ✏️ Fix typos in docstrings and comments * 🏗️ Improve user Depends utilities to simplify and remove code * 🔥 Remove deprecated SQLAlchemy parameter
46 lines
1.5 KiB
Python
46 lines
1.5 KiB
Python
import jwt
|
|
from fastapi import Depends, HTTPException, Security
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from jwt import PyJWTError
|
|
from sqlalchemy.orm import Session
|
|
from starlette.status import HTTP_403_FORBIDDEN
|
|
|
|
from app import crud
|
|
from app.api.utils.db import get_db
|
|
from app.core import config
|
|
from app.core.jwt import ALGORITHM
|
|
from app.db_models.user import User
|
|
from app.models.token import TokenPayload
|
|
|
|
reusable_oauth2 = OAuth2PasswordBearer(tokenUrl="/api/v1/login/access-token")
|
|
|
|
|
|
def get_current_user(
|
|
db: Session = Depends(get_db), token: str = Security(reusable_oauth2)
|
|
):
|
|
try:
|
|
payload = jwt.decode(token, config.SECRET_KEY, algorithms=[ALGORITHM])
|
|
token_data = TokenPayload(**payload)
|
|
except PyJWTError:
|
|
raise HTTPException(
|
|
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
|
|
)
|
|
user = crud.user.get(db, user_id=token_data.user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
|
|
def get_current_active_user(current_user: User = Security(get_current_user)):
|
|
if not crud.user.is_active(current_user):
|
|
raise HTTPException(status_code=400, detail="Inactive user")
|
|
return current_user
|
|
|
|
|
|
def get_current_active_superuser(current_user: User = Security(get_current_user)):
|
|
if not crud.user.is_superuser(current_user):
|
|
raise HTTPException(
|
|
status_code=400, detail="The user doesn't have enough privileges"
|
|
)
|
|
return current_user
|