Update all for Postgres and new techniques

This commit is contained in:
Sebastián Ramírez
2019-02-23 18:44:29 +04:00
parent 1b4d244033
commit 6fdba19639
72 changed files with 793 additions and 1316 deletions

View File

@@ -27,4 +27,4 @@ def test_use_access_token(superuser_token_headers):
)
result = r.json()
assert r.status_code == 200
assert "username" in result
assert "email" in result

View File

@@ -1,8 +1,8 @@
import requests
from app.core import config
from app.crud.user import get_user, upsert_user
from app.db.database import get_default_bucket
from app.crud import user as crud_user
from app.db.session import db_session
from app.models.user import UserInCreate
from app.tests.utils.user import user_authentication_headers
from app.tests.utils.utils import get_server_api, random_lower_string
@@ -15,16 +15,16 @@ def test_get_users_superuser_me(superuser_token_headers):
)
current_user = r.json()
assert current_user
assert current_user["disabled"] is False
assert "superuser" in current_user["admin_roles"]
assert current_user["username"] == config.FIRST_SUPERUSER
assert current_user["is_active"] is True
assert current_user["is_superuser"]
assert current_user["email"] == config.FIRST_SUPERUSER
def test_create_user_new_email(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
data = {"username": username, "password": password}
data = {"email": username, "password": password}
r = requests.post(
f"{server_api}{config.API_V1_STR}/users/",
headers=superuser_token_headers,
@@ -32,26 +32,25 @@ def test_create_user_new_email(superuser_token_headers):
)
assert 200 <= r.status_code < 300
created_user = r.json()
bucket = get_default_bucket()
user = get_user(bucket, username)
assert user.username == created_user["username"]
user = crud_user.get_by_email(db_session, email=username)
assert user.email == created_user["email"]
def test_get_existing_user(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
user_in = UserInCreate(email=username, password=password)
user = crud_user.create(db_session, user_in=user_in)
user_id = user.id
r = requests.get(
f"{server_api}{config.API_V1_STR}/users/{username}",
f"{server_api}{config.API_V1_STR}/users/{user_id}",
headers=superuser_token_headers,
)
assert 200 <= r.status_code < 300
api_user = r.json()
user = get_user(bucket, username)
assert user.username == api_user["username"]
user = crud_user.get_by_email(db_session, email=username)
assert user.email == api_user["email"]
def test_create_user_existing_username(superuser_token_headers):
@@ -59,10 +58,9 @@ def test_create_user_existing_username(superuser_token_headers):
username = random_lower_string()
# username = email
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
data = {"username": username, "password": password}
user_in = UserInCreate(email=username, password=password)
user = crud_user.create(db_session, user_in=user_in)
data = {"email": username, "password": password}
r = requests.post(
f"{server_api}{config.API_V1_STR}/users/",
headers=superuser_token_headers,
@@ -77,11 +75,10 @@ def test_create_user_by_normal_user():
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
user_in = UserInCreate(email=username, password=password)
user = crud_user.create(db_session, user_in=user_in)
user_token_headers = user_authentication_headers(server_api, username, password)
data = {"username": username, "password": password}
data = {"email": username, "password": password}
r = requests.post(
f"{server_api}{config.API_V1_STR}/users/", headers=user_token_headers, json=data
)
@@ -92,14 +89,13 @@ def test_retrieve_users(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
user_in = UserInCreate(email=username, password=password)
user = crud_user.create(db_session, user_in=user_in)
username2 = random_lower_string()
password2 = random_lower_string()
user_in2 = UserInCreate(username=username2, email=username2, password=password2)
user2 = upsert_user(bucket, user_in, persist_to=1)
user_in2 = UserInCreate(email=username2, password=password2)
user2 = crud_user.create(db_session, user_in=user_in2)
r = requests.get(
f"{server_api}{config.API_V1_STR}/users/", headers=superuser_token_headers
@@ -108,5 +104,4 @@ def test_retrieve_users(superuser_token_headers):
assert len(all_users) > 1
for user in all_users:
assert "username" in user
assert "admin_roles" in user
assert "email" in user

View File

@@ -1,7 +0,0 @@
from app.crud.user import get_user_doc_id
def test_get_user_id():
username = "johndoe@example.com"
user_id = get_user_doc_id(username)
assert user_id == "userprofile::johndoe@example.com"

View File

@@ -1,14 +1,7 @@
from fastapi.encoders import jsonable_encoder
from app.crud.user import (
authenticate_user,
check_if_user_is_active,
check_if_user_is_superuser,
get_user,
upsert_user,
)
from app.db.database import get_default_bucket
from app.models.role import RoleEnum
from app.crud import user as crud_user
from app.db.session import db_session
from app.models.user import UserInCreate
from app.tests.utils.utils import random_lower_string
@@ -16,90 +9,75 @@ from app.tests.utils.utils import random_lower_string
def test_create_user():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=email, email=email, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
assert hasattr(user, "username")
assert user.username == email
user_in = UserInCreate(email=email, password=password)
user = crud_user.create(db_session, user_in=user_in)
assert user.email == email
assert hasattr(user, "hashed_password")
assert hasattr(user, "type")
assert user.type == "userprofile"
def test_authenticate_user():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=email, email=email, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
authenticated_user = authenticate_user(bucket, email, password)
user_in = UserInCreate(email=email, password=password)
user = crud_user.create(db_session, user_in=user_in)
authenticated_user = crud_user.authenticate(
db_session, email=email, password=password
)
assert authenticated_user
assert user.username == authenticated_user.username
assert user.email == authenticated_user.email
def test_not_authenticate_user():
email = random_lower_string()
password = random_lower_string()
bucket = get_default_bucket()
user = authenticate_user(bucket, email, password)
user = crud_user.authenticate(db_session, email=email, password=password)
assert user is False
def test_check_if_user_is_active():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=email, email=email, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
is_active = check_if_user_is_active(user)
user_in = UserInCreate(email=email, password=password)
user = crud_user.create(db_session, user_in=user_in)
is_active = crud_user.is_active(user)
assert is_active is True
def test_check_if_user_is_active_inactive():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(
username=email, email=email, password=password, disabled=True
)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
is_active = check_if_user_is_active(user)
assert is_active is False
user_in = UserInCreate(email=email, password=password, disabled=True)
print(user_in)
user = crud_user.create(db_session, user_in=user_in)
print(user)
is_active = crud_user.is_active(user)
print(is_active)
assert is_active
def test_check_if_user_is_superuser():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(
username=email, email=email, password=password, admin_roles=[RoleEnum.superuser]
)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
is_superuser = check_if_user_is_superuser(user)
user_in = UserInCreate(email=email, password=password, is_superuser=True)
user = crud_user.create(db_session, user_in=user_in)
is_superuser = crud_user.is_superuser(user)
assert is_superuser is True
def test_check_if_user_is_superuser_normal_user():
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
is_superuser = check_if_user_is_superuser(user)
user_in = UserInCreate(email=username, password=password)
user = crud_user.create(db_session, user_in=user_in)
is_superuser = crud_user.is_superuser(user)
assert is_superuser is False
def test_get_user():
password = random_lower_string()
username = random_lower_string()
user_in = UserInCreate(
username=username,
email=username,
password=password,
admin_roles=[RoleEnum.superuser],
)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
user_2 = get_user(bucket, username)
assert user.username == user_2.username
user_in = UserInCreate(email=username, password=password, is_superuser=True)
user = crud_user.create(db_session, user_in=user_in)
user_2 = crud_user.get(db_session, user_id=user.id)
assert user.email == user_2.email
assert jsonable_encoder(user) == jsonable_encoder(user_2)