diff options
Diffstat (limited to 'macaroonbakery/tests/test_checkers.py')
-rw-r--r-- | macaroonbakery/tests/test_checkers.py | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/macaroonbakery/tests/test_checkers.py b/macaroonbakery/tests/test_checkers.py new file mode 100644 index 0000000..f552fa4 --- /dev/null +++ b/macaroonbakery/tests/test_checkers.py @@ -0,0 +1,356 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +from datetime import datetime, timedelta +from unittest import TestCase + +import six +import pytz +from pymacaroons import Macaroon, MACAROON_V2 + +import macaroonbakery.checkers as checkers + +# A frozen time for the tests. +NOW = datetime( + year=2006, month=1, day=2, hour=15, minute=4, second=5, microsecond=123) + + +class TestClock(): + def utcnow(self): + return pytz.UTC.localize(NOW) + + +class TestCheckers(TestCase): + def test_checkers(self): + + tests = [ + ('nothing in context, no extra checkers', [ + ('something', + 'caveat "something" not satisfied: caveat not recognized'), + ('', 'cannot parse caveat "": empty caveat'), + (' hello', 'cannot parse caveat " hello": caveat starts with' + ' space character'), + ], None), + ('one failed caveat', [ + ('t:a aval', None), + ('t:b bval', None), + ('t:a wrong', 'caveat "t:a wrong" not satisfied: wrong arg'), + ], None), + ('time from clock', [ + (checkers.time_before_caveat( + datetime.utcnow() + + timedelta(0, 1)).condition, + None), + (checkers.time_before_caveat(NOW).condition, + 'caveat "time-before 2006-01-02T15:04:05.000123Z" ' + 'not satisfied: macaroon has expired'), + (checkers.time_before_caveat(NOW - timedelta(0, 1)).condition, + 'caveat "time-before 2006-01-02T15:04:04.000123Z" ' + 'not satisfied: macaroon has expired'), + ('time-before bad-date', + 'caveat "time-before bad-date" not satisfied: ' + 'cannot parse "bad-date" as RFC 3339'), + (checkers.time_before_caveat(NOW).condition + " ", + 'caveat "time-before 2006-01-02T15:04:05.000123Z " ' + 'not satisfied: ' + 'cannot parse "2006-01-02T15:04:05.000123Z " as RFC 3339'), + ], lambda x: checkers.context_with_clock(ctx, TestClock())), + ('real time', [ + (checkers.time_before_caveat(datetime( + year=2010, month=1, day=1)).condition, + 'caveat "time-before 2010-01-01T00:00:00.000000Z" not ' + 'satisfied: macaroon has expired'), + (checkers.time_before_caveat(datetime( + year=3000, month=1, day=1)).condition, None), + ], None), + ('declared, no entries', [ + (checkers.declared_caveat('a', 'aval').condition, + 'caveat "declared a aval" not satisfied: got a=null, ' + 'expected "aval"'), + (checkers.COND_DECLARED, 'caveat "declared" not satisfied: ' + 'declared caveat has no value'), + ], None), + ('declared, some entries', [ + (checkers.declared_caveat('a', 'aval').condition, None), + (checkers.declared_caveat('b', 'bval').condition, None), + (checkers.declared_caveat('spc', ' a b').condition, None), + (checkers.declared_caveat('a', 'bval').condition, + 'caveat "declared a bval" not satisfied: ' + 'got a="aval", expected "bval"'), + (checkers.declared_caveat('a', ' aval').condition, + 'caveat "declared a aval" not satisfied: ' + 'got a="aval", expected " aval"'), + (checkers.declared_caveat('spc', 'a b').condition, + 'caveat "declared spc a b" not satisfied: ' + 'got spc=" a b", expected "a b"'), + (checkers.declared_caveat('', 'a b').condition, + 'caveat "error invalid caveat \'declared\' key """ ' + 'not satisfied: bad caveat'), + (checkers.declared_caveat('a b', 'a b').condition, + 'caveat "error invalid caveat \'declared\' key "a b"" ' + 'not satisfied: bad caveat'), + ], lambda x: checkers.context_with_declared(x, { + 'a': 'aval', + 'b': 'bval', + 'spc': ' a b'})), + ] + checker = checkers.Checker() + checker.namespace().register('testns', 't') + checker.register('a', 'testns', arg_checker(self, 't:a', 'aval')) + checker.register('b', 'testns', arg_checker(self, 't:b', 'bval')) + ctx = checkers.AuthContext() + for test in tests: + print(test[0]) + if test[2] is not None: + ctx1 = test[2](ctx) + else: + ctx1 = ctx + for check in test[1]: + err = checker.check_first_party_caveat(ctx1, check[0]) + if check[1] is not None: + self.assertEqual(err, check[1]) + else: + self.assertIsNone(err) + + def test_infer_declared(self): + tests = [ + ('no macaroons', [], {}, None), + ('single macaroon with one declaration', [ + [checkers.Caveat(condition='declared foo bar')] + ], {'foo': 'bar'}, None), + ('only one argument to declared', [ + [checkers.Caveat(condition='declared foo')] + ], {}, None), + ('spaces in value', [ + [checkers.Caveat(condition='declared foo bar bloggs')] + ], {'foo': 'bar bloggs'}, None), + ('attribute with declared prefix', [ + [checkers.Caveat(condition='declaredccf foo')] + ], {}, None), + ('several macaroons with different declares', [ + [ + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('b', 'bval') + ], [ + checkers.declared_caveat('c', 'cval'), + checkers.declared_caveat('d', 'dval') + ] + ], {'a': 'aval', 'b': 'bval', 'c': 'cval', 'd': 'dval'}, None), + ('duplicate values', [ + [ + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('b', 'bval') + ], [ + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('b', 'bval'), + checkers.declared_caveat('c', 'cval'), + checkers.declared_caveat('d', 'dval') + ] + ], {'a': 'aval', 'b': 'bval', 'c': 'cval', 'd': 'dval'}, None), + ('conflicting values', [ + [ + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('a', 'conflict'), + checkers.declared_caveat('b', 'bval') + ], [ + checkers.declared_caveat('a', 'conflict'), + checkers.declared_caveat('b', 'another conflict'), + checkers.declared_caveat('c', 'cval'), + checkers.declared_caveat('d', 'dval') + ] + ], {'c': 'cval', 'd': 'dval'}, None), + ('third party caveats ignored', [ + [checkers.Caveat(condition='declared a no conflict', + location='location')], + [checkers.declared_caveat('a', 'aval')] + ], {'a': 'aval'}, None), + ('unparseable caveats ignored', [ + [checkers.Caveat(condition=' bad')], + [checkers.declared_caveat('a', 'aval')] + ], {'a': 'aval'}, None), + ('infer with namespace', [ + [ + checkers.declared_caveat('a', 'aval'), + caveat_with_ns(checkers.declared_caveat('a', 'aval'), + 'testns'), + ] + ], {'a': 'aval'}, None), + ] + for test in tests: + uri_to_prefix = test[3] + if uri_to_prefix is None: + uri_to_prefix = {checkers.STD_NAMESPACE: ''} + ns = checkers.Namespace(uri_to_prefix) + print(test[0]) + ms = [] + for i, caveats in enumerate(test[1]): + m = Macaroon(key=None, identifier=six.int2byte(i), location='', + version=MACAROON_V2) + for cav in caveats: + cav = ns.resolve_caveat(cav) + if cav.location == '': + m.add_first_party_caveat(cav.condition) + else: + m.add_third_party_caveat(cav.location, None, + cav.condition) + ms.append(m) + self.assertEqual(checkers.infer_declared(ms), test[2]) + + def test_operations_checker(self): + tests = [ + ('all allowed', checkers.allow_caveat( + ['op1', 'op2', 'op4', 'op3']), + ['op1', 'op3', 'op2'], None), + ('none denied', checkers.deny_caveat(['op1', 'op2']), + ['op3', 'op4'], None), + ('one not allowed', checkers.allow_caveat(['op1', 'op2']), + ['op1', 'op3'], + 'caveat "allow op1 op2" not satisfied: op3 not allowed'), + ('one not denied', checkers.deny_caveat(['op1', 'op2']), + ['op4', 'op5', 'op2'], + 'caveat "deny op1 op2" not satisfied: op2 not allowed'), + ('no operations, allow caveat', checkers.allow_caveat(['op1']), + [], + 'caveat "allow op1" not satisfied: op1 not allowed'), + ('no operations, deny caveat', checkers.deny_caveat(['op1']), + [], None), + ('no operations, empty allow caveat', checkers.Caveat( + condition=checkers.COND_ALLOW), + [], 'caveat "allow" not satisfied: no operations allowed'), + ] + checker = checkers.Checker() + for test in tests: + print(test[0]) + ctx = checkers.context_with_operations(checkers.AuthContext(), + test[2]) + err = checker.check_first_party_caveat(ctx, test[1].condition) + if test[3] is None: + self.assertIsNone(err) + continue + self.assertEqual(err, test[3]) + + def test_operation_error_caveat(self): + tests = [ + ('empty allow', checkers.allow_caveat(None), + 'error no operations allowed'), + ('allow: invalid operation name', + checkers.allow_caveat(['op1', 'operation number 2']), + 'error invalid operation name "operation number 2"'), + ('deny: invalid operation name', + checkers.deny_caveat(['op1', 'operation number 2']), + 'error invalid operation name "operation number 2"') + ] + for test in tests: + print(test[0]) + self.assertEqual(test[1].condition, test[2]) + + def test_register_none_func_raise_exception(self): + checker = checkers.Checker() + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x', checkers.STD_NAMESPACE, None) + self.assertEqual(ctx.exception.args[0], + 'no check function registered for namespace std when ' + 'registering condition x') + + def test_register_no_registered_ns_exception(self): + checker = checkers.Checker() + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x', 'testns', lambda x: None) + self.assertEqual(ctx.exception.args[0], + 'no prefix registered for namespace testns when ' + 'registering condition x') + + def test_register_empty_prefix_condition_with_colon(self): + checker = checkers.Checker() + checker.namespace().register('testns', '') + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x:y', 'testns', lambda x: None) + self.assertEqual(ctx.exception.args[0], + 'caveat condition x:y in namespace testns contains a ' + 'colon but its prefix is empty') + + def test_register_twice_same_namespace(self): + checker = checkers.Checker() + checker.namespace().register('testns', '') + checker.register('x', 'testns', lambda x: None) + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x', 'testns', lambda x: None) + self.assertEqual(ctx.exception.args[0], + 'checker for x (namespace testns) already registered' + ' in namespace testns') + + def test_register_twice_different_namespace(self): + checker = checkers.Checker() + checker.namespace().register('testns', '') + checker.namespace().register('otherns', '') + checker.register('x', 'testns', lambda x: None) + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x', 'otherns', lambda x: None) + self.assertEqual(ctx.exception.args[0], + 'checker for x (namespace otherns) already registered' + ' in namespace testns') + + def test_checker_info(self): + checker = checkers.Checker(include_std_checkers=False) + checker.namespace().register('one', 't') + checker.namespace().register('two', 't') + checker.namespace().register('three', '') + checker.namespace().register('four', 's') + + class Called(object): + val = '' + + def register(name, ns): + def func(ctx, cond, arg): + Called.val = name + ' ' + ns + return None + + checker.register(name, ns, func) + + register('x', 'one') + register('y', 'one') + register('z', 'two') + register('a', 'two') + register('something', 'three') + register('other', 'three') + register('xxx', 'four') + + expect = [ + checkers.CheckerInfo(ns='four', name='xxx', prefix='s'), + checkers.CheckerInfo(ns='one', name='x', prefix='t'), + checkers.CheckerInfo(ns='one', name='y', prefix='t'), + checkers.CheckerInfo(ns='three', name='other', prefix=''), + checkers.CheckerInfo(ns='three', name='something', prefix=''), + checkers.CheckerInfo(ns='two', name='a', prefix='t'), + checkers.CheckerInfo(ns='two', name='z', prefix='t'), + ] + infos = checker.info() + self.assertEqual(len(infos), len(expect)) + new_infos = [] + for i, info in enumerate(infos): + Called.val = '' + info.check(None, '', '') + self.assertEqual(Called.val, expect[i].name + ' ' + + expect[i].ns) + new_infos.append(checkers.CheckerInfo(ns=info.ns, name=info.name, + prefix=info.prefix)) + self.assertEqual(new_infos, expect) + + +def caveat_with_ns(cav, ns): + return checkers.Caveat(location=cav.location, condition=cav.condition, + namespace=ns) + + +def arg_checker(test, expect_cond, check_arg): + ''' Returns a checker function that checks that the caveat condition is + check_arg. + ''' + + def func(ctx, cond, arg): + test.assertEqual(cond, expect_cond) + if arg != check_arg: + return 'wrong arg' + return None + + return func |