diff options
Diffstat (limited to 'macaroonbakery/checkers/_checkers.py')
-rw-r--r-- | macaroonbakery/checkers/_checkers.py | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/macaroonbakery/checkers/_checkers.py b/macaroonbakery/checkers/_checkers.py new file mode 100644 index 0000000..71cb56f --- /dev/null +++ b/macaroonbakery/checkers/_checkers.py @@ -0,0 +1,246 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +import abc +from collections import namedtuple +from datetime import datetime + +import pyrfc3339 +import pytz +from ._caveat import parse_caveat +from ._conditions import ( + COND_ALLOW, + COND_DECLARED, + COND_DENY, + COND_ERROR, + COND_TIME_BEFORE, + STD_NAMESPACE, +) +from ._declared import DECLARED_KEY +from ._namespace import Namespace +from ._operation import OP_KEY +from ._time import TIME_KEY +from ._utils import condition_with_prefix + + +class RegisterError(Exception): + '''Raised when a condition cannot be registered with a Checker.''' + pass + + +class FirstPartyCaveatChecker(object): + '''Used to check first party caveats for validity with respect to + information in the provided context. + + If the caveat kind was not recognised, the checker should return + ErrCaveatNotRecognized. + ''' + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def check_first_party_caveat(self, ctx, caveat): + ''' Checks that the given caveat condition is valid with respect to + the given context information. + :param ctx: an Auth context + :param caveat a string + ''' + raise NotImplementedError('check_first_party_caveat method must be ' + 'defined in subclass') + + def namespace(self): + ''' Returns the namespace associated with the caveat checker. + ''' + raise NotImplementedError('namespace method must be ' + 'defined in subclass') + + +class Checker(FirstPartyCaveatChecker): + ''' Holds a set of checkers for first party caveats. + ''' + + def __init__(self, namespace=None, include_std_checkers=True): + if namespace is None: + namespace = Namespace() + self._namespace = namespace + self._checkers = {} + if include_std_checkers: + self.register_std() + + def check_first_party_caveat(self, ctx, cav): + ''' Checks the caveat against all registered caveat conditions. + :return: error message string if any or None + ''' + try: + cond, arg = parse_caveat(cav) + except ValueError as ex: + # If we can't parse it, perhaps it's in some other format, + # return a not-recognised error. + return 'cannot parse caveat "{}": {}'.format(cav, ex.args[0]) + checker = self._checkers.get(cond) + if checker is None: + return 'caveat "{}" not satisfied: caveat not recognized'.format( + cav) + err = checker.check(ctx, cond, arg) + if err is not None: + return 'caveat "{}" not satisfied: {}'.format(cav, err) + + def namespace(self): + ''' Returns the namespace associated with the Checker. + ''' + return self._namespace + + def info(self): + ''' Returns information on all the registered checkers. + + Sorted by namespace and then name + :returns a list of CheckerInfo + ''' + return sorted(self._checkers.values(), key=lambda x: (x.ns, x.name)) + + def register(self, cond, uri, check): + ''' Registers the given condition(string) in the given namespace + uri (string) to be checked with the given check function. + The check function checks a caveat by passing an auth context, a cond + parameter(string) that holds the caveat condition including any + namespace prefix and an arg parameter(string) that hold any additional + caveat argument text. It will return any error as string otherwise + None. + + It will raise a ValueError if the namespace is not registered or + if the condition has already been registered. + ''' + if check is None: + raise RegisterError( + 'no check function registered for namespace {} when ' + 'registering condition {}'.format(uri, cond)) + + prefix = self._namespace.resolve(uri) + if prefix is None: + raise RegisterError('no prefix registered for namespace {} when ' + 'registering condition {}'.format(uri, cond)) + + if prefix == '' and cond.find(':') >= 0: + raise RegisterError( + 'caveat condition {} in namespace {} contains a colon but its' + ' prefix is empty'.format(cond, uri)) + + full_cond = condition_with_prefix(prefix, cond) + info = self._checkers.get(full_cond) + if info is not None: + raise RegisterError( + 'checker for {} (namespace {}) already registered in ' + 'namespace {}'.format(full_cond, uri, info.ns)) + self._checkers[full_cond] = CheckerInfo( + check=check, + ns=uri, + name=cond, + prefix=prefix) + + def register_std(self): + ''' Registers all the standard checkers in the given checker. + + If not present already, the standard checkers schema (STD_NAMESPACE) is + added to the checker's namespace with an empty prefix. + ''' + self._namespace.register(STD_NAMESPACE, '') + for cond in _ALL_CHECKERS: + self.register(cond, STD_NAMESPACE, _ALL_CHECKERS[cond]) + + +class CheckerInfo(namedtuple('CheckInfo', 'prefix name ns check')): + '''CheckerInfo holds information on a registered checker. + ''' + __slots__ = () + + def __new__(cls, prefix, name, ns, check=None): + ''' + :param check holds the actual checker function which takes an auth + context and a condition and arg string as arguments. + :param prefix holds the prefix for the checker condition as string. + :param name holds the name of the checker condition as string. + :param ns holds the namespace URI for the checker's schema as + Namespace. + ''' + return super(CheckerInfo, cls).__new__(cls, prefix, name, ns, check) + + +def _check_time_before(ctx, cond, arg): + clock = ctx.get(TIME_KEY) + if clock is None: + now = pytz.UTC.localize(datetime.utcnow()) + else: + now = clock.utcnow() + + try: + if pyrfc3339.parse(arg) <= now: + return 'macaroon has expired' + except ValueError: + return 'cannot parse "{}" as RFC 3339'.format(arg) + return None + + +def _check_declared(ctx, cond, arg): + parts = arg.split(' ', 1) + if len(parts) != 2: + return 'declared caveat has no value' + attrs = ctx.get(DECLARED_KEY, {}) + val = attrs.get(parts[0]) + if val is None: + return 'got {}=null, expected "{}"'.format(parts[0], parts[1]) + + if val != parts[1]: + return 'got {}="{}", expected "{}"'.format(parts[0], val, parts[1]) + return None + + +def _check_error(ctx, cond, arg): + return 'bad caveat' + + +def _check_allow(ctx, cond, arg): + return _check_operations(ctx, True, arg) + + +def _check_deny(ctx, cond, arg): + return _check_operations(ctx, False, arg) + + +def _check_operations(ctx, need_ops, arg): + ''' Checks an allow or a deny caveat. The need_ops parameter specifies + whether we require all the operations in the caveat to be declared in + the context. + ''' + ctx_ops = ctx.get(OP_KEY, []) + if len(ctx_ops) == 0: + if need_ops: + f = arg.split() + if len(f) == 0: + return 'no operations allowed' + return '{} not allowed'.format(f[0]) + return None + + fields = arg.split() + for op in ctx_ops: + err = _check_op(op, need_ops, fields) + if err is not None: + return err + return None + + +def _check_op(ctx_op, need_op, fields): + found = False + for op in fields: + if op == ctx_op: + found = True + break + if found != need_op: + return '{} not allowed'.format(ctx_op) + return None + + +_ALL_CHECKERS = { + COND_TIME_BEFORE: _check_time_before, + COND_DECLARED: _check_declared, + COND_ERROR: _check_error, + COND_ALLOW: _check_allow, + COND_DENY: _check_deny, +} |