import logging
import handlers
from handlers import BaseHandler
import tornado.auth
import urllib.parse
import base64
import uuid
import db
import peewee
[docs]class DeveloperLoginHandler(BaseHandler):
[docs] def get(self):
if not self.get_argument("user", False):
self.send_error(status_code=403)
elif not self.get_argument("email", False):
self.send_error(status_code=403)
self.set_secure_cookie('user', self.get_argument("user"))
self.set_secure_cookie('email', self.get_argument("email"))
self.set_secure_cookie('identity', self.get_argument("email"))
self.finish()
[docs]class DeveloperLogoutHandler(BaseHandler):
[docs] def get(self):
self.clear_all_cookies()
redirect = self.get_argument("next", '/')
self.redirect(redirect)
[docs]class ElixirLoginHandler(BaseHandler, tornado.auth.OAuth2Mixin):
_OAUTH_AUTHORIZE_URL = "https://login.elixir-czech.org/oidc/authorize"
_OAUTH_ACCESS_TOKEN_URL = "https://login.elixir-czech.org/oidc/token"
_OAUTH_USERINFO_ENDPOINT = "https://login.elixir-czech.org/oidc/userinfo"
_OAUTH_SETTINGS_KEY = 'elixir_oauth'
def _generate_state(self):
state = uuid.uuid4().hex
self.set_secure_cookie('state', state)
return state
def _check_state(self, state):
cookie_state = self.get_secure_cookie('state').decode('ascii')
return state == cookie_state
[docs] async def get(self):
if self.get_argument("code", False):
if not self._check_state(self.get_argument('state', 'N/A')):
self.set_user_msg("We're being MITM:ed or something ABORT!", "error")
self.redirect("/security_warning")
return
user_token = await self.get_user_token(self.get_argument('code'))
user = await self.get_user(user_token["access_token"])
self.set_secure_cookie('access_token', user_token["access_token"])
self.set_secure_cookie('user', user["name"])
self.set_secure_cookie('email', user["email"])
self.set_secure_cookie('identity', user["sub"])
redirect = self.get_secure_cookie("login_redirect")
self.clear_cookie("login_redirect")
if redirect is None:
redirect = '/'
self.redirect(redirect)
elif self.get_argument("error", False):
logging.error("Elixir error: {}".format( self.get_argument("error") ))
logging.error(" Description: {}".format( self.get_argument("error_description") ))
self.set_user_msg("Elixir Error: %s, %s" % (self.get_argument("error"),
self.get_argument("error_description")))
self.redirect("/error")
else:
self.set_secure_cookie('login_redirect', self.get_argument("next", '/'), 1)
state = self._generate_state()
self.authorize_redirect(
redirect_uri = self.settings['elixir_oauth']['redirect_uri'],
client_id = self.settings['elixir_oauth']['id'],
scope = ['openid', 'profile', 'email', 'bona_fide_status'],
response_type = 'code',
extra_params = {'state': state}
)
[docs] async def get_user(self, access_token):
http = self.get_auth_http_client()
response = await http.fetch(
self._OAUTH_USERINFO_ENDPOINT,
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': "Bearer {}".format(access_token),
}
)
if response.error:
logging.error("get_user error: {}".format(response))
return
return tornado.escape.json_decode( response.body )
[docs] async def get_user_token(self, code):
redirect_uri = self.settings['elixir_oauth']['redirect_uri']
http = self.get_auth_http_client()
body = urllib.parse.urlencode({
"redirect_uri": redirect_uri,
"code": code,
"grant_type": "authorization_code",
})
client_id = self.settings['elixir_oauth']['id']
secret = self.settings['elixir_oauth']['secret']
authorization = base64.b64encode(
bytes("{}:{}".format(client_id, secret),
'ascii' )
).decode('ascii')
response = await http.fetch(
self._OAUTH_ACCESS_TOKEN_URL,
method = "POST",
body = body,
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': "Basic {}".format(authorization),
},
)
if response.error:
logging.error("get_user_token error: {}".format(response))
return
return tornado.escape.json_decode( response.body )
[docs]class ElixirLogoutHandler(BaseHandler):
[docs] def get(self):
self.clear_all_cookies()
redirect = self.get_argument("next", '/')
self.redirect(redirect)