Source code for koi.auth

# -*- coding: utf-8 -*-
# Copyright 2016 Open Permissions Platform Coalition
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed under the License is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
# 

"""
Provides a auth_required decorator to check a request is authenticated,
and an authorised decorator to check a request is authorised
"""
import functools
import inspect
import logging

from tornado import gen
from tornado.concurrent import Future

from .exceptions import HTTPError


[docs]def auth_required(validator): """Decorate a RequestHandler or method to require that a request is authenticated If decorating a coroutine make sure coroutine decorator is first. eg.:: class Handler(tornado.web.RequestHandler): @auth_required(validator) @coroutine def get(self): pass :param validator: a coroutine that will validate the token and return True/False """ def _auth_required_decorator(handler): if inspect.isclass(handler): return _wrap_class(handler, validator) return _auth_required(handler, validator) return _auth_required_decorator
[docs]def auth_optional(validator): """Decorate a RequestHandler or method to accept optional authentication token If decorating a coroutine make sure coroutine decorator is first. eg.:: class Handler(tornado.web.RequestHandler): @auth_required(validator) @coroutine def get(self): pass :param validator: a coroutine that will validate the token and return True/False """ def _auth_optional_decorator(handler): if inspect.isclass(handler): return _wrap_class(handler, validator) return _auth_optional(handler, validator) return _auth_optional_decorator
def _wrap_class(request_handler, validator): """Decorate each HTTP verb method to check if the request is authenticated :param request_handler: a tornado.web.RequestHandler instance """ METHODS = ['get', 'post', 'put', 'head', 'options', 'delete', 'patch'] for name in METHODS: method = getattr(request_handler, name) setattr(request_handler, name, _auth_required(method, validator)) return request_handler def _get_token(request): """ Gets authentication token from request header Will raise 401 error if token not found :return token: an authorization token. """ token = request.headers.get('Authorization') if not token: message = 'Token not in Authorization header' logging.warning(message) raise HTTPError(401, message) return token def _get_optional_token(request): """ Gets authentication token from request header Returns None if token not found :return token: an authorization token. """ token = request.headers.get('Authorization') if not token: return None else: return token def _auth_required(method, validator): """Decorate a HTTP verb method and check the request is authenticated :param method: a tornado.web.RequestHandler method :param validator: a token validation coroutine, that should return True/False depending if token is or is not valid """ @gen.coroutine @functools.wraps(method) def wrapper(self, *args, **kwargs): token = _get_token(self.request) valid_token = yield validator(token) if not valid_token: message = 'Invalid token: {}'.format(token) logging.warning(message) raise HTTPError(401, message) result = method(self, *args, **kwargs) if isinstance(result, Future): result = yield result raise gen.Return(result) return wrapper def _auth_optional(method, validator): """Decorate a HTTP verb method and look for optional authentication :param method: a tornado.web.RequestHandler method :param validator: a token validation coroutine, that should return True/False depending if token is or is not valid """ @gen.coroutine @functools.wraps(method) def wrapper(self, *args, **kwargs): token = _get_optional_token(self.request) if token is not None: valid_token = yield validator(token) if not valid_token: message = 'Invalid token: {}'.format(token) logging.warning(message) raise HTTPError(401, message) result = method(self, *args, **kwargs) if isinstance(result, Future): result = yield result raise gen.Return(result) return wrapper
[docs]def authorized(validator): """Decorate a RequestHandler or method to require that a request is authorized If decorating a coroutine make sure coroutine decorator is first. eg.:: class Handler(tornado.web.RequestHandler): @authorized(validator) @coroutine def get(self): pass :param validator: a coroutine that will authorize the user associated with the token and return True/False """ def _authorized_decorator(method): @gen.coroutine def wrapper(self, *args, **kwargs): token = _get_token(self.request) authorized = yield validator(token, **kwargs) if not authorized: message = 'Token is not authorised for this action: {}'.format(token) logging.warning(message) raise HTTPError(403, message) result = method(self, *args, **kwargs) if isinstance(result, Future): result = yield result raise gen.Return(result) return wrapper return _authorized_decorator