# Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. from datetime import datetime, timedelta from unittest import TestCase import macaroonbakery.checkers as checkers import pytz import six from pymacaroons import MACAROON_V2, Macaroon # A frozen time for the tests. NOW = datetime( year=2006, month=1, day=2, hour=15, minute=4, second=5, microsecond=123) class TestClock(): def utcnow(self): return pytz.UTC.localize(NOW) class TestCheckers(TestCase): def test_checkers(self): tests = [ ('nothing in context, no extra checkers', [ ('something', 'caveat "something" not satisfied: caveat not recognized'), ('', 'cannot parse caveat "": empty caveat'), (' hello', 'cannot parse caveat " hello": caveat starts with' ' space character'), ], None), ('one failed caveat', [ ('t:a aval', None), ('t:b bval', None), ('t:a wrong', 'caveat "t:a wrong" not satisfied: wrong arg'), ], None), ('time from clock', [ (checkers.time_before_caveat( datetime.utcnow() + timedelta(0, 1)).condition, None), (checkers.time_before_caveat(NOW).condition, 'caveat "time-before 2006-01-02T15:04:05.000123Z" ' 'not satisfied: macaroon has expired'), (checkers.time_before_caveat(NOW - timedelta(0, 1)).condition, 'caveat "time-before 2006-01-02T15:04:04.000123Z" ' 'not satisfied: macaroon has expired'), ('time-before bad-date', 'caveat "time-before bad-date" not satisfied: ' 'cannot parse "bad-date" as RFC 3339'), (checkers.time_before_caveat(NOW).condition + " ", 'caveat "time-before 2006-01-02T15:04:05.000123Z " ' 'not satisfied: ' 'cannot parse "2006-01-02T15:04:05.000123Z " as RFC 3339'), ], lambda x: checkers.context_with_clock(ctx, TestClock())), ('real time', [ (checkers.time_before_caveat(datetime( year=2010, month=1, day=1)).condition, 'caveat "time-before 2010-01-01T00:00:00.000000Z" not ' 'satisfied: macaroon has expired'), (checkers.time_before_caveat(datetime( year=3000, month=1, day=1)).condition, None), ], None), ('declared, no entries', [ (checkers.declared_caveat('a', 'aval').condition, 'caveat "declared a aval" not satisfied: got a=null, ' 'expected "aval"'), (checkers.COND_DECLARED, 'caveat "declared" not satisfied: ' 'declared caveat has no value'), ], None), ('declared, some entries', [ (checkers.declared_caveat('a', 'aval').condition, None), (checkers.declared_caveat('b', 'bval').condition, None), (checkers.declared_caveat('spc', ' a b').condition, None), (checkers.declared_caveat('a', 'bval').condition, 'caveat "declared a bval" not satisfied: ' 'got a="aval", expected "bval"'), (checkers.declared_caveat('a', ' aval').condition, 'caveat "declared a aval" not satisfied: ' 'got a="aval", expected " aval"'), (checkers.declared_caveat('spc', 'a b').condition, 'caveat "declared spc a b" not satisfied: ' 'got spc=" a b", expected "a b"'), (checkers.declared_caveat('', 'a b').condition, 'caveat "error invalid caveat \'declared\' key """ ' 'not satisfied: bad caveat'), (checkers.declared_caveat('a b', 'a b').condition, 'caveat "error invalid caveat \'declared\' key "a b"" ' 'not satisfied: bad caveat'), ], lambda x: checkers.context_with_declared(x, { 'a': 'aval', 'b': 'bval', 'spc': ' a b'})), ] checker = checkers.Checker() checker.namespace().register('testns', 't') checker.register('a', 'testns', arg_checker(self, 't:a', 'aval')) checker.register('b', 'testns', arg_checker(self, 't:b', 'bval')) ctx = checkers.AuthContext() for test in tests: print(test[0]) if test[2] is not None: ctx1 = test[2](ctx) else: ctx1 = ctx for check in test[1]: err = checker.check_first_party_caveat(ctx1, check[0]) if check[1] is not None: self.assertEqual(err, check[1]) else: self.assertIsNone(err) def test_infer_declared(self): tests = [ ('no macaroons', [], {}, None), ('single macaroon with one declaration', [ [checkers.Caveat(condition='declared foo bar')] ], {'foo': 'bar'}, None), ('only one argument to declared', [ [checkers.Caveat(condition='declared foo')] ], {}, None), ('spaces in value', [ [checkers.Caveat(condition='declared foo bar bloggs')] ], {'foo': 'bar bloggs'}, None), ('attribute with declared prefix', [ [checkers.Caveat(condition='declaredccf foo')] ], {}, None), ('several macaroons with different declares', [ [ checkers.declared_caveat('a', 'aval'), checkers.declared_caveat('b', 'bval') ], [ checkers.declared_caveat('c', 'cval'), checkers.declared_caveat('d', 'dval') ] ], {'a': 'aval', 'b': 'bval', 'c': 'cval', 'd': 'dval'}, None), ('duplicate values', [ [ checkers.declared_caveat('a', 'aval'), checkers.declared_caveat('a', 'aval'), checkers.declared_caveat('b', 'bval') ], [ checkers.declared_caveat('a', 'aval'), checkers.declared_caveat('b', 'bval'), checkers.declared_caveat('c', 'cval'), checkers.declared_caveat('d', 'dval') ] ], {'a': 'aval', 'b': 'bval', 'c': 'cval', 'd': 'dval'}, None), ('conflicting values', [ [ checkers.declared_caveat('a', 'aval'), checkers.declared_caveat('a', 'conflict'), checkers.declared_caveat('b', 'bval') ], [ checkers.declared_caveat('a', 'conflict'), checkers.declared_caveat('b', 'another conflict'), checkers.declared_caveat('c', 'cval'), checkers.declared_caveat('d', 'dval') ] ], {'c': 'cval', 'd': 'dval'}, None), ('third party caveats ignored', [ [checkers.Caveat(condition='declared a no conflict', location='location')], [checkers.declared_caveat('a', 'aval')] ], {'a': 'aval'}, None), ('unparseable caveats ignored', [ [checkers.Caveat(condition=' bad')], [checkers.declared_caveat('a', 'aval')] ], {'a': 'aval'}, None), ('infer with namespace', [ [ checkers.declared_caveat('a', 'aval'), caveat_with_ns(checkers.declared_caveat('a', 'aval'), 'testns'), ] ], {'a': 'aval'}, None), ] for test in tests: uri_to_prefix = test[3] if uri_to_prefix is None: uri_to_prefix = {checkers.STD_NAMESPACE: ''} ns = checkers.Namespace(uri_to_prefix) print(test[0]) ms = [] for i, caveats in enumerate(test[1]): m = Macaroon(key=None, identifier=six.int2byte(i), location='', version=MACAROON_V2) for cav in caveats: cav = ns.resolve_caveat(cav) if cav.location == '': m.add_first_party_caveat(cav.condition) else: m.add_third_party_caveat(cav.location, None, cav.condition) ms.append(m) self.assertEqual(checkers.infer_declared(ms), test[2]) def test_operations_checker(self): tests = [ ('all allowed', checkers.allow_caveat( ['op1', 'op2', 'op4', 'op3']), ['op1', 'op3', 'op2'], None), ('none denied', checkers.deny_caveat(['op1', 'op2']), ['op3', 'op4'], None), ('one not allowed', checkers.allow_caveat(['op1', 'op2']), ['op1', 'op3'], 'caveat "allow op1 op2" not satisfied: op3 not allowed'), ('one not denied', checkers.deny_caveat(['op1', 'op2']), ['op4', 'op5', 'op2'], 'caveat "deny op1 op2" not satisfied: op2 not allowed'), ('no operations, allow caveat', checkers.allow_caveat(['op1']), [], 'caveat "allow op1" not satisfied: op1 not allowed'), ('no operations, deny caveat', checkers.deny_caveat(['op1']), [], None), ('no operations, empty allow caveat', checkers.Caveat( condition=checkers.COND_ALLOW), [], 'caveat "allow" not satisfied: no operations allowed'), ] checker = checkers.Checker() for test in tests: print(test[0]) ctx = checkers.context_with_operations(checkers.AuthContext(), test[2]) err = checker.check_first_party_caveat(ctx, test[1].condition) if test[3] is None: self.assertIsNone(err) continue self.assertEqual(err, test[3]) def test_operation_error_caveat(self): tests = [ ('empty allow', checkers.allow_caveat(None), 'error no operations allowed'), ('allow: invalid operation name', checkers.allow_caveat(['op1', 'operation number 2']), 'error invalid operation name "operation number 2"'), ('deny: invalid operation name', checkers.deny_caveat(['op1', 'operation number 2']), 'error invalid operation name "operation number 2"') ] for test in tests: print(test[0]) self.assertEqual(test[1].condition, test[2]) def test_register_none_func_raise_exception(self): checker = checkers.Checker() with self.assertRaises(checkers.RegisterError) as ctx: checker.register('x', checkers.STD_NAMESPACE, None) self.assertEqual(ctx.exception.args[0], 'no check function registered for namespace std when ' 'registering condition x') def test_register_no_registered_ns_exception(self): checker = checkers.Checker() with self.assertRaises(checkers.RegisterError) as ctx: checker.register('x', 'testns', lambda x: None) self.assertEqual(ctx.exception.args[0], 'no prefix registered for namespace testns when ' 'registering condition x') def test_register_empty_prefix_condition_with_colon(self): checker = checkers.Checker() checker.namespace().register('testns', '') with self.assertRaises(checkers.RegisterError) as ctx: checker.register('x:y', 'testns', lambda x: None) self.assertEqual(ctx.exception.args[0], 'caveat condition x:y in namespace testns contains a ' 'colon but its prefix is empty') def test_register_twice_same_namespace(self): checker = checkers.Checker() checker.namespace().register('testns', '') checker.register('x', 'testns', lambda x: None) with self.assertRaises(checkers.RegisterError) as ctx: checker.register('x', 'testns', lambda x: None) self.assertEqual(ctx.exception.args[0], 'checker for x (namespace testns) already registered' ' in namespace testns') def test_register_twice_different_namespace(self): checker = checkers.Checker() checker.namespace().register('testns', '') checker.namespace().register('otherns', '') checker.register('x', 'testns', lambda x: None) with self.assertRaises(checkers.RegisterError) as ctx: checker.register('x', 'otherns', lambda x: None) self.assertEqual(ctx.exception.args[0], 'checker for x (namespace otherns) already registered' ' in namespace testns') def test_checker_info(self): checker = checkers.Checker(include_std_checkers=False) checker.namespace().register('one', 't') checker.namespace().register('two', 't') checker.namespace().register('three', '') checker.namespace().register('four', 's') class Called(object): val = '' def register(name, ns): def func(ctx, cond, arg): Called.val = name + ' ' + ns return None checker.register(name, ns, func) register('x', 'one') register('y', 'one') register('z', 'two') register('a', 'two') register('something', 'three') register('other', 'three') register('xxx', 'four') expect = [ checkers.CheckerInfo(ns='four', name='xxx', prefix='s'), checkers.CheckerInfo(ns='one', name='x', prefix='t'), checkers.CheckerInfo(ns='one', name='y', prefix='t'), checkers.CheckerInfo(ns='three', name='other', prefix=''), checkers.CheckerInfo(ns='three', name='something', prefix=''), checkers.CheckerInfo(ns='two', name='a', prefix='t'), checkers.CheckerInfo(ns='two', name='z', prefix='t'), ] infos = checker.info() self.assertEqual(len(infos), len(expect)) new_infos = [] for i, info in enumerate(infos): Called.val = '' info.check(None, '', '') self.assertEqual(Called.val, expect[i].name + ' ' + expect[i].ns) new_infos.append(checkers.CheckerInfo(ns=info.ns, name=info.name, prefix=info.prefix)) self.assertEqual(new_infos, expect) def caveat_with_ns(cav, ns): return checkers.Caveat(location=cav.location, condition=cav.condition, namespace=ns) def arg_checker(test, expect_cond, check_arg): ''' Returns a checker function that checks that the caveat condition is check_arg. ''' def func(ctx, cond, arg): test.assertEqual(cond, expect_cond) if arg != check_arg: return 'wrong arg' return None return func