🎉 First commit, from couchbase generator, basic changes

not tested / updated yet
This commit is contained in:
Sebastián Ramírez
2019-02-09 19:42:36 +04:00
commit 7f8bfc8faa
198 changed files with 21022 additions and 0 deletions

View File

@@ -0,0 +1 @@
.cache

View File

@@ -0,0 +1,16 @@
import requests
from app.core import config
from app.tests.utils.utils import get_server_api
def test_celery_worker_test(superuser_token_headers):
server_api = get_server_api()
data = {"msg": "test"}
r = requests.post(
f"{server_api}{config.API_V1_STR}/test-celery/",
json=data,
headers=superuser_token_headers,
)
response = r.json()
assert response["msg"] == "Word received"

View File

@@ -0,0 +1,30 @@
import requests
from app.core import config
from app.tests.utils.utils import get_server_api
def test_get_access_token():
server_api = get_server_api()
login_data = {
"username": config.FIRST_SUPERUSER,
"password": config.FIRST_SUPERUSER_PASSWORD,
}
r = requests.post(
f"{server_api}{config.API_V1_STR}/login/access-token", data=login_data
)
tokens = r.json()
assert r.status_code == 200
assert "access_token" in tokens
assert tokens["access_token"]
def test_use_access_token(superuser_token_headers):
server_api = get_server_api()
r = requests.post(
f"{server_api}{config.API_V1_STR}/login/test-token",
headers=superuser_token_headers,
)
result = r.json()
assert r.status_code == 200
assert "username" in result

View File

@@ -0,0 +1,112 @@
import requests
from app.core import config
from app.crud.user import get_user, upsert_user
from app.db.database import get_default_bucket
from app.models.user import UserInCreate
from app.tests.utils.user import user_authentication_headers
from app.tests.utils.utils import get_server_api, random_lower_string
def test_get_users_superuser_me(superuser_token_headers):
server_api = get_server_api()
r = requests.get(
f"{server_api}{config.API_V1_STR}/users/me", headers=superuser_token_headers
)
current_user = r.json()
assert current_user
assert current_user["disabled"] is False
assert "superuser" in current_user["admin_roles"]
assert current_user["username"] == config.FIRST_SUPERUSER
def test_create_user_new_email(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
data = {"username": username, "password": password}
r = requests.post(
f"{server_api}{config.API_V1_STR}/users/",
headers=superuser_token_headers,
json=data,
)
assert 200 <= r.status_code < 300
created_user = r.json()
bucket = get_default_bucket()
user = get_user(bucket, username)
assert user.username == created_user["username"]
def test_get_existing_user(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
r = requests.get(
f"{server_api}{config.API_V1_STR}/users/{username}",
headers=superuser_token_headers,
)
assert 200 <= r.status_code < 300
api_user = r.json()
user = get_user(bucket, username)
assert user.username == api_user["username"]
def test_create_user_existing_username(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
# username = email
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
data = {"username": username, "password": password}
r = requests.post(
f"{server_api}{config.API_V1_STR}/users/",
headers=superuser_token_headers,
json=data,
)
created_user = r.json()
assert r.status_code == 400
assert "_id" not in created_user
def test_create_user_by_normal_user():
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
user_token_headers = user_authentication_headers(server_api, username, password)
data = {"username": username, "password": password}
r = requests.post(
f"{server_api}{config.API_V1_STR}/users/", headers=user_token_headers, json=data
)
assert r.status_code == 400
def test_retrieve_users(superuser_token_headers):
server_api = get_server_api()
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
username2 = random_lower_string()
password2 = random_lower_string()
user_in2 = UserInCreate(username=username2, email=username2, password=password2)
user2 = upsert_user(bucket, user_in, persist_to=1)
r = requests.get(
f"{server_api}{config.API_V1_STR}/users/", headers=superuser_token_headers
)
all_users = r.json()
assert len(all_users) > 1
for user in all_users:
assert "username" in user
assert "admin_roles" in user

View File

@@ -0,0 +1,13 @@
import pytest
from app.tests.utils.utils import get_server_api, get_superuser_token_headers
@pytest.fixture(scope="module")
def server_api():
return get_server_api()
@pytest.fixture(scope="module")
def superuser_token_headers():
return get_superuser_token_headers()

View File

@@ -0,0 +1,7 @@
from app.crud.user import get_user_doc_id
def test_get_user_id():
username = "johndoe@example.com"
user_id = get_user_doc_id(username)
assert user_id == "userprofile::johndoe@example.com"

View File

@@ -0,0 +1,105 @@
from fastapi.encoders import jsonable_encoder
from app.crud.user import (
authenticate_user,
check_if_user_is_active,
check_if_user_is_superuser,
get_user,
upsert_user,
)
from app.db.database import get_default_bucket
from app.models.role import RoleEnum
from app.models.user import UserInCreate
from app.tests.utils.utils import random_lower_string
def test_create_user():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=email, email=email, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
assert hasattr(user, "username")
assert user.username == email
assert hasattr(user, "hashed_password")
assert hasattr(user, "type")
assert user.type == "userprofile"
def test_authenticate_user():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=email, email=email, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
authenticated_user = authenticate_user(bucket, email, password)
assert authenticated_user
assert user.username == authenticated_user.username
def test_not_authenticate_user():
email = random_lower_string()
password = random_lower_string()
bucket = get_default_bucket()
user = authenticate_user(bucket, email, password)
assert user is False
def test_check_if_user_is_active():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=email, email=email, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
is_active = check_if_user_is_active(user)
assert is_active is True
def test_check_if_user_is_active_inactive():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(
username=email, email=email, password=password, disabled=True
)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
is_active = check_if_user_is_active(user)
assert is_active is False
def test_check_if_user_is_superuser():
email = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(
username=email, email=email, password=password, admin_roles=[RoleEnum.superuser]
)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
is_superuser = check_if_user_is_superuser(user)
assert is_superuser is True
def test_check_if_user_is_superuser_normal_user():
username = random_lower_string()
password = random_lower_string()
user_in = UserInCreate(username=username, email=username, password=password)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
is_superuser = check_if_user_is_superuser(user)
assert is_superuser is False
def test_get_user():
password = random_lower_string()
username = random_lower_string()
user_in = UserInCreate(
username=username,
email=username,
password=password,
admin_roles=[RoleEnum.superuser],
)
bucket = get_default_bucket()
user = upsert_user(bucket, user_in, persist_to=1)
user_2 = get_user(bucket, username)
assert user.username == user_2.username
assert jsonable_encoder(user) == jsonable_encoder(user_2)

View File

@@ -0,0 +1,13 @@
import requests
from app.core import config
def user_authentication_headers(server_api, email, password):
data = {"username": email, "password": password}
r = requests.post(f"{server_api}{config.API_V1_STR}/login/access-token", data=data)
response = r.json()
auth_token = response["access_token"]
headers = {"Authorization": f"Bearer {auth_token}"}
return headers

View File

@@ -0,0 +1,31 @@
import random
import string
import requests
from app.core import config
def random_lower_string():
return "".join(random.choices(string.ascii_lowercase, k=32))
def get_server_api():
server_name = f"http://{config.SERVER_NAME}"
return server_name
def get_superuser_token_headers():
server_api = get_server_api()
login_data = {
"username": config.FIRST_SUPERUSER,
"password": config.FIRST_SUPERUSER_PASSWORD,
}
r = requests.post(
f"{server_api}{config.API_V1_STR}/login/access-token", data=login_data
)
tokens = r.json()
a_token = tokens["access_token"]
headers = {"Authorization": f"Bearer {a_token}"}
# superuser_token_headers = headers
return headers