import unittest
from unittest import TestCase
import requests
import peewee
import db
[docs]class RequestTests(TestCase):
HOST = 'http://localhost:4000'
[docs] def newSession(self):
if hasattr(self, '_session'):
self.destroySession()
self._session = requests.Session()
[docs] def destroySession(self):
self._session.close()
del self._session
@property
def session(self):
if hasattr(self, '_session'):
return self._session
return requests
@property
def cookies(self):
if hasattr(self, '_session'):
return self._session.cookies
return {}
[docs] def getUrl(self, path):
return "{}{}".format(self.HOST, path)
[docs] def get(self, path, *args, **kwargs):
return self.session.get(self.getUrl(path), *args, **kwargs)
[docs] def post(self, path, *args, **kwargs):
return self.session.post(self.getUrl(path), *args, **kwargs)
[docs] def assertHTTPCode(self, path, code=200, method='get', *args, **kwargs):
m = getattr(self, method.lower())
r = m(path, *args, **kwargs)
self.assertEqual(r.status_code, code)
[docs] def login_user(self,user):
self.assertHTTPCode('/developer/login?user={}&email={}'.format(user,user), 200)
[docs]class TestEndpoints(RequestTests):
[docs] def test_datasets(self):
self.assertHTTPCode('/api/dataset')
[docs] def test_one_dataset(self):
self.assertHTTPCode('/api/dataset/Dataset%201')
[docs] def test_dataset_logo(self):
self.assertHTTPCode('/api/dataset/Dataset%201/logo', 404)
[docs] def test_dataset_collection(self):
self.assertHTTPCode('/api/dataset/Dataset%201/collection')
[docs] def test_get_versions(self):
self.assertHTTPCode('/api/dataset/Dataset%201/versions')
[docs] def test_get_one_version(self):
self.assertHTTPCode('/api/dataset/Dataset%201/versions/Version%201-1')
[docs] def test_get_users_current(self):
self.assertHTTPCode('/api/dataset/Dataset%201/users_current', 403)
[docs] def test_get_users_pending(self):
self.assertHTTPCode('/api/dataset/Dataset%201/users_pending', 403)
[docs] def test_create_temporary_link(self):
self.assertHTTPCode('/api/dataset/Dataset%201/temporary_link', 403)
[docs] def test_list_files(self):
self.assertHTTPCode('/api/dataset/Dataset%201/files', 403)
[docs] def test_request_access(self):
self.assertHTTPCode('/api/dataset/Dataset%201/users/email0/request', 403)
[docs] def test_user_me(self):
self.assertHTTPCode('/api/users/me')
[docs] def test_user_datasets(self):
self.assertHTTPCode('/api/users/datasets', 403)
[docs]class TestRequestAccess(RequestTests):
USER='e1'
[docs] def setUp(self):
self.newSession()
if db.database.is_closed():
db.database.connect()
[docs] def tearDown(self):
self.destroySession()
try:
u = db.User.select(db.User).where(db.User.email==self.USER).get()
try:
u.dataset_access.get().delete_instance()
except peewee.PeeweeException:
pass
try:
for l in u.access_logs:
l.delete_instance()
except peewee.PeeweeException:
pass
try:
u.delete_instance()
except peewee.PeeweeException:
pass
except db.User.DoesNotExist:
pass
if not db.database.is_closed():
db.database.close()
[docs] def test_login(self):
self.login_user(self.USER)
count = db.User.select().where(db.User.email==self.USER).count()
self.assertEqual(count, 0)
self.assertIn('email', self.cookies)
self.assertIn('user', self.cookies)
[docs] def test_request_access_with_get(self):
self.login_user(self.USER)
self.assertHTTPCode('/api/dataset/Dataset 1/users/{}/request'.format(self.USER), 405)
[docs] def test_request_access_without_xsrf(self):
self.login_user(self.USER)
r = self.post('/api/dataset/Dataset 1/users/{}/request'.format(self.USER),
data = {
"affiliation": 'none',
"country": "Sweden",
"newsletter": 0,
}
)
self.assertEqual(r.status_code, 403)
[docs] def test_get_xsrf_token(self):
self.get('/api/dataset')
self.assertIn('_xsrf', self.cookies)
[docs] def test_request_access_with_wrong_xsrf_1(self):
self.login_user(self.USER)
r = self.post('/api/dataset/Dataset 1/users/{}/request'.format(self.USER),
data = {
"affiliation": 'none',
"country": "Sweden",
"newsletter": 0,
"_xsrf": 'Fnurken',
}
)
self.assertEqual(r.status_code, 403)
[docs] def test_request_access_with_wrong_xsrf_2(self):
self.login_user(self.USER)
self.assertIn('_xsrf', self.cookies)
self.assertIn('user', self.cookies)
self.assertIn('email', self.cookies)
r = self.post('/api/dataset/Dataset 1/users/{}/request'.format(self.USER),
data = {
"affiliation": 'none',
"country": "Sweden",
"newsletter": 0,
"_xsrf": 'Fnurken',
}
)
self.assertEqual(r.status_code, 403)
[docs] def test_request_access_correctly(self):
self.login_user(self.USER)
count = db.User.select().where(db.User.email==self.USER).count()
self.assertEqual(count, 0)
r = self.post('/api/dataset/Dataset 1/users/{}/request'.format(self.USER),
data = {
"affiliation": 'none',
"country": "Sweden",
"newsletter": 0,
"_xsrf": self.cookies['_xsrf'],
}
)
self.assertEqual(r.status_code, 200)
count = db.User.select().where(db.User.email==self.USER).count()
self.assertEqual(count, 1)
r = self.get('/api/users/datasets')
self.assertEqual(r.status_code, 200)
try:
json = r.json()
except ValueError:
self.fail("Can't parse JSON Data")
self.assertIn('data', json)
self.assertEqual(json['data'][0]['access'], False)
[docs]class TestAdminAccess(RequestTests):
[docs] def setUp(self):
self.newSession()
[docs] def tearDown(self):
self.destroySession()
[docs] def test_login_admin(self):
self.login_user('admin12')
self.assertIn('email', self.cookies)
self.assertIn('user', self.cookies)
[docs] def test_admin_list_users(self):
self.login_user('admin12')
self.assertHTTPCode('/api/dataset/Dataset%201/users_current', 200)
self.assertHTTPCode('/api/dataset/Dataset%201/users_pending', 200)
self.assertHTTPCode('/api/dataset/Dataset%202/users_current', 200)
self.assertHTTPCode('/api/dataset/Dataset%202/users_pending', 200)
[docs] def test_admin_list_users_only_own_project_1(self):
self.login_user('admin1')
self.assertHTTPCode('/api/dataset/Dataset%201/users_current', 200)
self.assertHTTPCode('/api/dataset/Dataset%201/users_pending', 200)
self.assertHTTPCode('/api/dataset/Dataset%202/users_current', 403)
self.assertHTTPCode('/api/dataset/Dataset%202/users_pending', 403)
[docs] def test_admin_list_users_only_own_project_2(self):
self.login_user('admin2')
self.assertHTTPCode('/api/dataset/Dataset%201/users_current', 403)
self.assertHTTPCode('/api/dataset/Dataset%201/users_pending', 403)
self.assertHTTPCode('/api/dataset/Dataset%202/users_current', 200)
self.assertHTTPCode('/api/dataset/Dataset%202/users_pending', 200)
[docs] def test_admin_list_users_get_data(self):
self.login_user('admin1')
u = self.get('/api/dataset/Dataset%201/users_pending')
try:
json = u.json()
except ValueError:
self.fail("Can't parse JSON Data")
self.assertIn('data', json)
user = json['data'][0]
self.assertIn('email', user)
self.assertIn('user', user)
self.assertIn('hasAccess', user)
self.assertEqual(user['hasAccess'], 0)
[docs] def test_admin_is_admin(self):
self.login_user('admin1')
r = self.get('/api/users/datasets')
self.assertEqual(r.status_code, 200)
try:
json = r.json()
except ValueError:
self.fail("Can't parse JSON Data")
self.assertIn('data', json)
self.assertEqual(json['data'][0]['access'], True)
self.assertEqual(json['data'][0]['isAdmin'], True)
[docs]class TestUserManagement(RequestTests):
[docs] def setUp(self):
self.newSession()
[docs] def tearDown(self):
self.destroySession()
if hasattr(self, '_email'):
al = (db.UserAccessLog.select()
.join(db.User)
.where(
( (db.UserAccessLog.action == 'access_granted')
| (db.UserAccessLog.action == 'access_revoked'))
& (db.User.email == self._email)
))
for a in al:
a.delete_instance()
[docs] def test_admin_approve_user(self):
self.login_user('admin1')
r = self.get('/api/dataset/Dataset%201/users_pending')
email = r.json()['data'][0]['email']
self._email = email
self.assertHTTPCode('/api/dataset/Dataset%201/users/{}/approve'.format(email), 405)
self.assertHTTPCode('/api/dataset/Dataset%201/users/{}/approve'.format(email), 403, 'POST')
self.assertHTTPCode('/api/dataset/Dataset%201/users/{}/approve'.format(email), 403, 'POST',
data = {'_xsrf': "The wrong thing"} )
self.assertHTTPCode('/api/dataset/Dataset%201/users/{}/approve'.format(email), 200, 'POST',
data = {'_xsrf': self.cookies['_xsrf']} )
r = self.get('/api/dataset/Dataset%201/users_current')
l = [u for u in r.json()['data'] if u['email'] == email]
self.assertEqual(len(l), 1)
[docs] def test_recently_approved_user_can_list_files(self):
self.login_user('admin1')
u = self.get('/api/dataset/Dataset%201/users_pending').json()['data'][0]
self._email = u['email']
self.newSession()
self.login_user(u['email'])
self.assertHTTPCode('/api/dataset/Dataset%201/files', 403)
self.newSession()
self.login_user('admin1')
self.assertHTTPCode('/api/dataset/Dataset%201/users/{}/approve'.format(u['email']), 200, 'POST',
data = {'_xsrf': self.cookies['_xsrf']} )
self.newSession()
self.login_user(u['email'])
self.assertHTTPCode('/api/dataset/Dataset%201/files', 200)
# The dataset should be listed on the dataset page as well
r = self.get('/api/users/datasets')
self.assertEqual(r.status_code, 200)
try:
json = r.json()
except ValueError:
self.fail("Can't parse JSON Data")
self.assertIn('data', json)
self.assertEqual(json['data'][0]['access'], True)
[docs] def test_recently_revoked_user_cant_list_files(self):
self.login_user('admin1')
u = self.get('/api/dataset/Dataset%201/users_current').json()['data'][0]
self._email = u['email']
self.newSession()
self.login_user(u['email'])
self.assertHTTPCode('/api/dataset/Dataset%201/files', 200)
self.newSession()
self.login_user('admin1')
self.assertHTTPCode('/api/dataset/Dataset%201/users/{}/revoke'.format(u['email']), 200, 'POST',
data = {'_xsrf': self.cookies['_xsrf']} )
self.newSession()
self.login_user(u['email'])
self.assertHTTPCode('/api/dataset/Dataset%201/files', 403)
[docs] def test_full_user_roundabout(self):
email = 'unlisted'
self._email = email
## Request access
self.assertHTTPCode('/api/dataset/Dataset%201/files', 403)
self.login_user(email)
self.assertHTTPCode('/api/dataset/Dataset%201/files', 403)
self.assertHTTPCode('/api/dataset/Dataset 1/users/{}/request'.format(email), 200, 'POST',
data = {
"affiliation": 'none',
"country": "Sweden",
"newsletter": 0,
"_xsrf": self.cookies['_xsrf'],
}
)
self.assertHTTPCode('/api/dataset/Dataset%201/files', 403)
## Approve user
self.newSession()
self.login_user('admin1')
# User is in pending queue
users = self.get('/api/dataset/Dataset%201/users_pending').json()['data']
u = [ x for x in users if x['email'] == email ]
self.assertEqual(len(u), 1)
self.assertHTTPCode('/api/dataset/Dataset%201/users/{}/approve'.format(email), 200, 'POST',
data = {'_xsrf': self.cookies['_xsrf']} )
# User is in approved queue
users = self.get('/api/dataset/Dataset%201/users_current').json()['data']
u = [ x for x in users if x['email'] == email ]
self.assertEqual(len(u), 1)
# User can log in and do stuff
self.newSession()
self.login_user(email)
self.assertHTTPCode('/api/dataset/Dataset%201/files', 200)
# Revoke user
self.newSession()
self.login_user('admin1')
self.assertHTTPCode('/api/dataset/Dataset%201/users/{}/revoke'.format(email), 200, 'POST',
data = {'_xsrf': self.cookies['_xsrf']} )
# User is not in approved queue
users = self.get('/api/dataset/Dataset%201/users_current').json()['data']
u = [ x for x in users if x['email'] == email ]
self.assertEqual(len(u), 0)
# User can log in but cant do stuff
self.newSession()
self.login_user(email)
self.assertHTTPCode('/api/dataset/Dataset%201/files', 403)
[docs]class TestLoggedInUser(RequestTests):
[docs] def setUp(self):
if not hasattr(self, '_email'):
self.newSession()
self.login_user('admin1')
u = self.get('/api/dataset/Dataset%201/users_current').json()['data'][0]
self._email = u['email']
self.newSession()
self.login_user(self._email)
[docs] def tearDown(self):
self.destroySession()
[docs] def testLoggedInFiles(self):
self.assertHTTPCode('/api/dataset/Dataset 1/files', 200)
[docs] def testLoggedInTempLinkGet(self):
self.assertHTTPCode('/api/dataset/Dataset 1/temporary_link', 405)
[docs] def testLoggedInTempLinkPost(self):
self.assertHTTPCode('/api/dataset/Dataset 1/temporary_link', 403, 'POST')
[docs] def testLoggedInTempLinkPostXSRF1(self):
self.assertHTTPCode('/api/dataset/Dataset 1/temporary_link', 403, 'POST',
data = { '_xsrf': "INCORRECT"})
[docs] def testLoggedInTempLinkPostXSRF2(self):
self.assertHTTPCode('/api/dataset/Dataset 1/temporary_link', 200, 'POST',
data = {'_xsrf': self.cookies['_xsrf']} )
if __name__ == '__main__':
unittest.main()