Files
fast_api_api_template/crud/users.py
Kevin ANATOLE 6700a4e2ef Register
Login
Basic migrations with alembic
Get items
2023-02-19 02:47:14 +00:00

61 lines
1.7 KiB
Python

from datetime import timedelta, datetime
from jose import jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from models.users import User
from schemas.users import UserCreate
from api.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_user(db: Session, user_id: str):
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: UserCreate):
hashed_password = get_password_hash(user.password)
db_user = User(email=user.email, hashed_password=hashed_password, display_name=user.display_name)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def authenticate_user(db: Session, username: str, password: str):
user = get_user_by_email(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
def get_password_hash(password):
return pwd_context.hash(password)