diff options
Diffstat (limited to 'macaroonbakery/tests')
-rw-r--r-- | macaroonbakery/tests/__init__.py | 2 | ||||
-rw-r--r-- | macaroonbakery/tests/common.py | 120 | ||||
-rw-r--r-- | macaroonbakery/tests/test_agent.py | 13 | ||||
-rw-r--r-- | macaroonbakery/tests/test_authorizer.py | 132 | ||||
-rw-r--r-- | macaroonbakery/tests/test_checker.py | 963 | ||||
-rw-r--r-- | macaroonbakery/tests/test_checkers.py | 356 | ||||
-rw-r--r-- | macaroonbakery/tests/test_codec.py | 164 | ||||
-rw-r--r-- | macaroonbakery/tests/test_discharge.py | 445 | ||||
-rw-r--r-- | macaroonbakery/tests/test_discharge_all.py | 170 | ||||
-rw-r--r-- | macaroonbakery/tests/test_keyring.py | 111 | ||||
-rw-r--r-- | macaroonbakery/tests/test_macaroon.py | 230 | ||||
-rw-r--r-- | macaroonbakery/tests/test_namespace.py | 15 | ||||
-rw-r--r-- | macaroonbakery/tests/test_oven.py | 125 | ||||
-rw-r--r-- | macaroonbakery/tests/test_store.py | 21 |
14 files changed, 2731 insertions, 136 deletions
diff --git a/macaroonbakery/tests/__init__.py b/macaroonbakery/tests/__init__.py index e69de29..46812ee 100644 --- a/macaroonbakery/tests/__init__.py +++ b/macaroonbakery/tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. diff --git a/macaroonbakery/tests/common.py b/macaroonbakery/tests/common.py new file mode 100644 index 0000000..2619127 --- /dev/null +++ b/macaroonbakery/tests/common.py @@ -0,0 +1,120 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +from datetime import datetime, timedelta + +import pytz + +import macaroonbakery +import macaroonbakery.checkers as checkers + + +class _StoppedClock(object): + def __init__(self, t): + self.t = t + + def utcnow(self): + return self.t + + +epoch = pytz.utc.localize( + datetime(year=1900, month=11, day=17, hour=19, minute=00, second=13)) +ages = epoch + timedelta(days=1) + +test_context = checkers.context_with_clock(checkers.AuthContext(), + _StoppedClock(epoch)) + + +def test_checker(): + c = checkers.Checker() + c.namespace().register('testns', '') + c.register('str', 'testns', str_check) + c.register('true', 'testns', true_check) + return c + + +_str_key = checkers.ContextKey('str_check') + + +def str_context(s): + return test_context.with_value(_str_key, s) + + +def str_check(ctx, cond, args): + expect = ctx[_str_key] + if args != expect: + return '{} doesn\'t match {}'.format(cond, expect) + return None + + +def true_check(ctx, cond, args): + # Always succeeds. + return None + + +class OneIdentity(macaroonbakery.IdentityClient): + '''An IdentityClient implementation that always returns a single identity + from declared_identity, allowing allow(LOGIN_OP) to work even when there + are no declaration caveats (this is mostly to support the legacy tests + which do their own checking of declaration caveats). + ''' + + def identity_from_context(self, ctx): + return None, None + + def declared_identity(self, ctx, declared): + return _NoOne() + + +class _NoOne(object): + def id(self): + return 'noone' + + def domain(self): + return '' + + +class ThirdPartyStrcmpChecker(macaroonbakery.ThirdPartyCaveatChecker): + def __init__(self, str): + self.str = str + + def check_third_party_caveat(self, ctx, cav_info): + condition = cav_info.condition + if isinstance(cav_info.condition, bytes): + condition = cav_info.condition.decode('utf-8') + if condition != self.str: + raise macaroonbakery.ThirdPartyCaveatCheckFailed( + '{} doesn\'t match {}'.format(condition, self.str)) + return [] + + +class ThirdPartyCheckerWithCaveats(macaroonbakery.ThirdPartyCaveatChecker): + def __init__(self, cavs=None): + if cavs is None: + cavs = [] + self.cavs = cavs + + def check_third_party_caveat(self, ctx, cav_info): + return self.cavs + + +class ThirdPartyCaveatCheckerEmpty(macaroonbakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(self, ctx, cav_info): + return [] + + +def new_bakery(location, locator=None): + # Returns a new Bakery instance using a new + # key pair, and registers the key with the given locator if provided. + # + # It uses test_checker to check first party caveats. + key = macaroonbakery.generate_key() + if locator is not None: + locator.add_info(location, + macaroonbakery.ThirdPartyInfo( + public_key=key.public_key, + version=macaroonbakery.LATEST_BAKERY_VERSION)) + return macaroonbakery.Bakery(key=key, + checker=test_checker(), + location=location, + identity_client=OneIdentity(), + locator=locator) diff --git a/macaroonbakery/tests/test_agent.py b/macaroonbakery/tests/test_agent.py index 86133fe..49134f5 100644 --- a/macaroonbakery/tests/test_agent.py +++ b/macaroonbakery/tests/test_agent.py @@ -1,6 +1,5 @@ # Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. - import base64 import json import os @@ -99,7 +98,7 @@ class TestAgents(TestCase): agent.load_agent_file(self.no_username_agent_filename) -agent_file = """ +agent_file = ''' { "key": { "public": "YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=", @@ -113,10 +112,10 @@ agent_file = """ "username": "user-2" }] } -""" +''' -bad_key_agent_file = """ +bad_key_agent_file = ''' { "key": { "public": "YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=", @@ -130,10 +129,10 @@ bad_key_agent_file = """ "username": "user-2" }] } -""" +''' -no_username_agent_file = """ +no_username_agent_file = ''' { "key": { "public": "YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=", @@ -146,4 +145,4 @@ no_username_agent_file = """ "username": "user-2" }] } -""" +''' diff --git a/macaroonbakery/tests/test_authorizer.py b/macaroonbakery/tests/test_authorizer.py new file mode 100644 index 0000000..da01974 --- /dev/null +++ b/macaroonbakery/tests/test_authorizer.py @@ -0,0 +1,132 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +from unittest import TestCase + +import macaroonbakery +import macaroonbakery.checkers as checkers + + +class TestAuthorizer(TestCase): + def test_authorize_func(self): + def f(ctx, identity, op): + self.assertEqual(identity.id(), 'bob') + if op.entity == 'a': + return False, None + elif op.entity == 'b': + return True, None + elif op.entity == 'c': + return True, [checkers.Caveat(location='somewhere', + condition='c')] + elif op.entity == 'd': + return True, [checkers.Caveat(location='somewhere', + condition='d')] + else: + self.fail('unexpected entity: ' + op.Entity) + + ops = [macaroonbakery.Op('a', 'x'), macaroonbakery.Op('b', 'x'), + macaroonbakery.Op('c', 'x'), macaroonbakery.Op('d', 'x')] + allowed, caveats = macaroonbakery.AuthorizerFunc(f).authorize( + checkers.AuthContext(), + macaroonbakery.SimpleIdentity('bob'), + ops + ) + self.assertEqual(allowed, [False, True, True, True]) + self.assertEqual(caveats, [ + checkers.Caveat(location='somewhere', condition='c'), + checkers.Caveat(location='somewhere', condition='d') + ]) + + def test_acl_authorizer(self): + ctx = checkers.AuthContext() + tests = [ + ('no ops, no problem', + macaroonbakery.ACLAuthorizer(allow_public=True, + get_acl=lambda x, y: []), None, [], + []), + ('identity that does not implement ACLIdentity; ' + 'user should be denied except for everyone group', + macaroonbakery.ACLAuthorizer(allow_public=True, + get_acl=lambda ctx, op: [ + macaroonbakery.EVERYONE] + if op.entity == 'a' else ['alice']), + SimplestIdentity('bob'), + [macaroonbakery.Op(entity='a', action='a'), + macaroonbakery.Op(entity='b', action='b')], + [True, False]), + ('identity that does not implement ACLIdentity with user == Id; ' + 'user should be denied except for everyone group', + macaroonbakery.ACLAuthorizer(allow_public=True, + get_acl=lambda ctx, op: [ + macaroonbakery.EVERYONE] if + op.entity == 'a' else ['bob']), + SimplestIdentity('bob'), + [macaroonbakery.Op(entity='a', action='a'), + macaroonbakery.Op(entity='b', action='b')], + [True, False]), + ('permission denied for everyone without AllowPublic', + macaroonbakery.ACLAuthorizer(allow_public=False, + get_acl=lambda x, y: [ + macaroonbakery.EVERYONE]), + SimplestIdentity('bob'), + [macaroonbakery.Op(entity='a', action='a')], + [False]), + ('permission granted to anyone with no identity with AllowPublic', + macaroonbakery.ACLAuthorizer(allow_public=True, + get_acl=lambda x, y: [ + macaroonbakery.EVERYONE]), + None, + [macaroonbakery.Op(entity='a', action='a')], + [True]) + ] + for test in tests: + allowed, caveats = test[1].authorize(ctx, test[2], test[3]) + self.assertEqual(len(caveats), 0) + self.assertEqual(allowed, test[4]) + + def test_context_wired_properly(self): + ctx = checkers.AuthContext({'a': 'aval'}) + + class Visited: + in_f = False + in_allow = False + in_get_acl = False + + def f(ctx, identity, op): + self.assertEqual(ctx.get('a'), 'aval') + Visited.in_f = True + return False, None + + macaroonbakery.AuthorizerFunc(f).authorize( + ctx, macaroonbakery.SimpleIdentity('bob'), ['op1'] + ) + self.assertTrue(Visited.in_f) + + class TestIdentity(SimplestIdentity, macaroonbakery.ACLIdentity): + def allow(other, ctx, acls): + self.assertEqual(ctx.get('a'), 'aval') + Visited.in_allow = True + return False + + def get_acl(ctx, acl): + self.assertEqual(ctx.get('a'), 'aval') + Visited.in_get_acl = True + return [] + + macaroonbakery.ACLAuthorizer(allow_public=False, + get_acl=get_acl).authorize( + ctx, TestIdentity('bob'), ['op1']) + self.assertTrue(Visited.in_get_acl) + self.assertTrue(Visited.in_allow) + + +class SimplestIdentity(macaroonbakery.Identity): + # SimplestIdentity implements Identity for a string. Unlike + # SimpleIdentity, it does not implement ACLIdentity. + def __init__(self, user): + self._identity = user + + def domain(self): + return '' + + def id(self): + return self._identity diff --git a/macaroonbakery/tests/test_checker.py b/macaroonbakery/tests/test_checker.py new file mode 100644 index 0000000..06bf008 --- /dev/null +++ b/macaroonbakery/tests/test_checker.py @@ -0,0 +1,963 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +import base64 +from collections import namedtuple +import json +from unittest import TestCase +from datetime import timedelta + +from pymacaroons.verifier import Verifier, FirstPartyCaveatVerifierDelegate +import pymacaroons + +import macaroonbakery +import macaroonbakery.checkers as checkers +from macaroonbakery.tests.common import test_context, epoch, test_checker + + +class TestChecker(TestCase): + def setUp(self): + self._discharges = [] + + def test_authorize_with_open_access_and_no_macaroons(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + {macaroonbakery.Op(entity='something', action='read'): + {macaroonbakery.EVERYONE}}) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + auth_info = client.do(test_context, ts, + [macaroonbakery.Op(entity='something', + action='read')]) + self.assertEqual(len(self._discharges), 0) + self.assertIsNotNone(auth_info) + self.assertIsNone(auth_info.identity) + self.assertEqual(len(auth_info.macaroons), 0) + + def test_authorization_denied(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = macaroonbakery.ClosedAuthorizer() + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + with self.assertRaises(macaroonbakery.PermissionDenied): + client.do(ctx, ts, [macaroonbakery.Op(entity='something', + action='read')]) + + def test_authorize_with_authentication_required(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + {macaroonbakery.Op(entity='something', action='read'): {'bob'}}) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + auth_info = client.do(ctx, ts, [macaroonbakery.Op(entity='something', + action='read')]) + self.assertEqual(self._discharges, + [_DischargeRecord(location='ids', user='bob')]) + self.assertIsNotNone(auth_info) + self.assertEqual(auth_info.identity.id(), 'bob') + self.assertEqual(len(auth_info.macaroons), 1) + + def test_authorize_multiple_ops(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='something', action='read'): {'bob'}, + macaroonbakery.Op(entity='otherthing', action='read'): {'bob'} + } + ) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + client.do(ctx, ts, [ + macaroonbakery.Op(entity='something', action='read'), + macaroonbakery.Op(entity='otherthing', action='read') + ]) + self.assertEqual(self._discharges, + [_DischargeRecord(location='ids', user='bob')]) + + def test_capability(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + {macaroonbakery.Op(entity='something', action='read'): {'bob'}}) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m = client.discharged_capability( + ctx, ts, [macaroonbakery.Op(entity='something', action='read')]) + # Check that we can exercise the capability directly on the service + # with no discharging required. + auth_info = ts.do(test_context, [m], + [macaroonbakery.Op(entity='something', + action='read')]) + self.assertIsNotNone(auth_info) + self.assertIsNone(auth_info.identity) + self.assertEqual(len(auth_info.macaroons), 1) + self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, + m[0].identifier_bytes) + + def test_capability_multiple_entities(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'bob'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + macaroonbakery.Op(entity='e3', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m = client.discharged_capability(ctx, ts, [ + macaroonbakery.Op(entity='e1', + action='read'), + macaroonbakery.Op(entity='e2', + action='read'), + macaroonbakery.Op(entity='e3', + action='read')]) + self.assertEqual(self._discharges, + [_DischargeRecord(location='ids', user='bob')]) + + # Check that we can exercise the capability directly on the service + # with no discharging required. + ts.do(test_context, [m], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read'), + macaroonbakery.Op(entity='e3', action='read')]) + + # Check that we can exercise the capability to act on a subset of + # the operations. + ts.do(test_context, [m], [ + macaroonbakery.Op(entity='e2', action='read'), + macaroonbakery.Op(entity='e3', action='read')] + ) + ts.do(test_context, [m], + [macaroonbakery.Op(entity='e3', action='read')]) + + def test_multiple_capabilities(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Acquire two capabilities as different users and check + # that we can combine them together to do both operations + # at once. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m1 = _Client(locator).discharged_capability(ctx, ts, + [macaroonbakery.Op( + entity='e1', + action='read')]) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m2 = _Client(locator).discharged_capability(ctx, ts, + [macaroonbakery.Op( + entity='e2', + action='read')]) + self.assertEqual(self._discharges, + [ + _DischargeRecord(location='ids', user='alice'), + _DischargeRecord(location='ids', user='bob'), + ]) + auth_info = ts.do(test_context, [m1, m2], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + self.assertIsNotNone(auth_info) + self.assertIsNone(auth_info.identity) + self.assertEqual(len(auth_info.macaroons), 2) + self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, + m1[0].identifier_bytes) + self.assertEqual(auth_info.macaroons[1][0].identifier_bytes, + m2[0].identifier_bytes) + + def test_combine_capabilities(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + macaroonbakery.Op(entity='e3', action='read'): {'bob', + 'alice'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Acquire two capabilities as different users and check + # that we can combine them together into a single capability + # capable of both operations. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m1 = _Client(locator).discharged_capability( + ctx, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e3', action='read')]) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m2 = _Client(locator).discharged_capability( + ctx, ts, [macaroonbakery.Op(entity='e2', action='read')]) + + m = ts.capability(test_context, [m1, m2], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read'), + macaroonbakery.Op(entity='e3', action='read')]) + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read'), + macaroonbakery.Op(entity='e3', action='read')]) + + def test_partially_authorized_request(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Acquire a capability for e1 but rely on authentication to + # authorize e2. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m = _Client(locator).discharged_capability(ctx, ts, + [macaroonbakery.Op( + entity='e1', + action='read')]) + client = _Client(locator) + client.add_macaroon(ts, 'authz', m) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + client.discharged_capability( + ctx, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + + def test_auth_with_third_party_caveats(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + + # We make an authorizer that requires a third party discharge + # when authorizing. + def authorize_with_tp_discharge(ctx, id, op): + if (id is not None and id.id() == 'bob' and + op == macaroonbakery.Op(entity='something', + action='read')): + return True, [checkers.Caveat(condition='question', + location='other third party')] + return False, None + + auth = macaroonbakery.AuthorizerFunc(authorize_with_tp_discharge) + ts = _Service('myservice', auth, ids, locator) + + class _LocalDischargeChecker(macaroonbakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(_, ctx, info): + if info.condition != 'question': + raise ValueError('third party condition not recognized') + self._discharges.append(_DischargeRecord( + location='other third party', + user=ctx.get(_DISCHARGE_USER_KEY) + )) + return [] + + locator['other third party'] = _Discharger( + key=macaroonbakery.generate_key(), + checker=_LocalDischargeChecker(), + locator=locator, + ) + client = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + client.do(ctx, ts, [macaroonbakery.Op(entity='something', + action='read')]) + self.assertEqual(self._discharges, [ + _DischargeRecord(location='ids', user='bob'), + _DischargeRecord(location='other third party', + user='bob') + ]) + + def test_capability_combines_first_party_caveats(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'} + } + ) + ts = _Service('myservice', auth, ids, locator) + + # Acquire two capabilities as different users, add some first party + # caveats that we can combine them together into a single capability + # capable of both operations. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m1 = _Client(locator).capability( + ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + m1.macaroon.add_first_party_caveat('true 1') + m1.macaroon.add_first_party_caveat('true 2') + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m2 = _Client(locator).capability( + ctx, ts, [macaroonbakery.Op(entity='e2', action='read')]) + m2.macaroon.add_first_party_caveat('true 3') + m2.macaroon.add_first_party_caveat('true 4') + + client = _Client(locator) + client.add_macaroon(ts, 'authz1', [m1.macaroon]) + client.add_macaroon(ts, 'authz2', [m2.macaroon]) + + m = client.capability(test_context, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [ + 'true 1', + 'true 2', + 'true 3', + 'true 4', + ]) + + def test_first_party_caveat_squashing(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'alice'}, + }) + ts = _Service('myservice', auth, ids, locator) + tests = [ + ('duplicates removed', [ + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.Caveat(condition='true 2', namespace='testns'), + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.Caveat(condition='true 3', namespace='testns'), + ], [ + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.Caveat(condition='true 2', namespace='testns'), + checkers.Caveat(condition='true 3', namespace='testns'), + ]), ('earliest time before', [ + checkers.time_before_caveat(epoch + timedelta(days=1)), + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.time_before_caveat( + epoch + timedelta(days=0, hours=1)), + checkers.time_before_caveat(epoch + timedelta( + days=0, hours=0, minutes=5)), + ], [ + checkers.time_before_caveat(epoch + timedelta( + days=0, hours=0, minutes=5)), + checkers.Caveat(condition='true 1', namespace='testns'), + ]), ('operations and declared caveats removed', [ + checkers.deny_caveat(['foo']), + checkers.allow_caveat(['read', 'write']), + checkers.declared_caveat('username', 'bob'), + checkers.Caveat(condition='true 1', namespace='testns'), + ], [ + checkers.Caveat(condition='true 1', namespace='testns'), + ]) + ] + for test in tests: + print(test[0]) + + # Make a first macaroon with all the required first party caveats. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m1 = _Client(locator).capability( + ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + m1.add_caveats(test[1], None, None) + + # Make a second macaroon that's not used to check that it's + # caveats are not added. + m2 = _Client(locator).capability( + ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + m2.add_caveat(checkers.Caveat( + condition='true notused', namespace='testns'), None, None) + client = _Client(locator) + client.add_macaroon(ts, 'authz1', [m1.macaroon]) + client.add_macaroon(ts, 'authz2', [m2.macaroon]) + + m3 = client.capability( + test_context, ts, [macaroonbakery.Op(entity='e1', + action='read')]) + self.assertEqual( + _macaroon_conditions(m3.macaroon.caveats, False), + _resolve_caveats(m3.namespace, test[2])) + + def test_login_only(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = macaroonbakery.ClosedAuthorizer() + ts = _Service('myservice', auth, ids, locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP]) + self.assertIsNotNone(auth_info) + self.assertEqual(auth_info.identity.id(), 'bob') + + def test_allow_any(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Acquire a capability for e1 but rely on authentication to + # authorize e2. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m = _Client(locator).discharged_capability(ctx, ts, + [macaroonbakery.Op( + entity='e1', + action='read')]) + + client = _Client(locator) + client.add_macaroon(ts, 'authz', m) + + self._discharges = [] + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + with self.assertRaises(_DischargeRequiredError): + client.do_any( + ctx, ts, [ + macaroonbakery.LOGIN_OP, + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e1', action='read') + ] + ) + self.assertEqual(len(self._discharges), 0) + + # Log in as bob. + _, err = client.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + + # All the previous actions should now be allowed. + auth_info, allowed = client.do_any( + ctx, ts, [ + macaroonbakery.LOGIN_OP, + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e1', action='read') + ] + ) + self.assertEqual(auth_info.identity.id(), 'bob') + self.assertEqual(len(auth_info.macaroons), 2) + self.assertEqual(allowed, [True, True, True]) + + def test_auth_with_identity_from_context(self): + locator = _DischargerLocator() + ids = _BasicAuthIdService() + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'sherlock'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Check that we can perform the ops with basic auth in the + # context. + ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes') + auth_info = _Client(locator).do( + ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + self.assertEqual(auth_info.identity.id(), 'sherlock') + self.assertEqual(len(auth_info.macaroons), 0) + + def test_auth_login_op_with_identity_from_context(self): + locator = _DischargerLocator() + ids = _BasicAuthIdService() + ts = _Service('myservice', macaroonbakery.ClosedAuthorizer(), + ids, locator) + + # Check that we can use LoginOp + # when auth isn't granted through macaroons. + ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes') + auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'sherlock') + self.assertEqual(len(auth_info.macaroons), 0) + + def test_operation_allow_caveat(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'bob'}, + macaroonbakery.Op(entity='e1', action='write'): {'bob'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m = client.capability( + ctx, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e1', action='write'), + macaroonbakery.Op(entity='e2', action='read')]) + + # Sanity check that we can do a write. + ts.do(test_context, [[m.macaroon]], + [macaroonbakery.Op(entity='e1', action='write')]) + + m.add_caveat(checkers.allow_caveat(['read']), None, None) + + # A read operation should work. + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + + # A write operation should fail + # even though the original macaroon allowed it. + with self.assertRaises(_DischargeRequiredError): + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='write')]) + + def test_operation_deny_caveat(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'bob'}, + macaroonbakery.Op(entity='e1', action='write'): {'bob'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m = client.capability( + ctx, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e1', action='write'), + macaroonbakery.Op(entity='e2', action='read')]) + + # Sanity check that we can do a write. + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='write')]) + + m.add_caveat(checkers.deny_caveat(['write']), None, None) + + # A read operation should work. + ts.do( + test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + + # A write operation should fail + # even though the original macaroon allowed it. + with self.assertRaises(_DischargeRequiredError): + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='write')]) + + def test_duplicate_login_macaroons(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = macaroonbakery.ClosedAuthorizer() + ts = _Service('myservice', auth, ids, locator) + + # Acquire a login macaroon for bob. + client1 = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + auth_info = client1.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'bob') + + # Acquire a login macaroon for alice. + client2 = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + auth_info = client2.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'alice') + + # Combine the two login macaroons into one client. + client3 = _Client(locator) + client3.add_macaroon(ts, '1.bob', + client1._macaroons[ts.name()]['authn']) + client3.add_macaroon(ts, '2.alice', + client2._macaroons[ts.name()]['authn']) + + # We should authenticate as bob (because macaroons are presented + # ordered by "cookie" name) + auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'bob') + self.assertEqual(len(auth_info.macaroons), 1) + + # Try them the other way around and we should authenticate as alice. + client3 = _Client(locator) + client3.add_macaroon(ts, '1.alice', + client2._macaroons[ts.name()]['authn']) + client3.add_macaroon(ts, '2.bob', + client1._macaroons[ts.name()]['authn']) + + auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'alice') + self.assertEqual(len(auth_info.macaroons), 1) + + def test_macaroon_ops_fatal_error(self): + # When we get a non-VerificationError error from the + # opstore, we don't do any more verification. + checker = macaroonbakery.Checker( + macaroon_opstore=_MacaroonStoreWithError()) + m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2) + with self.assertRaises(ValueError): + checker.auth([m]).allow(test_context, [macaroonbakery.LOGIN_OP]) + + +class _DischargerLocator(object): + def __init__(self, dischargers=None): + if dischargers is None: + dischargers = {} + self._dischargers = dischargers + + def third_party_info(self, loc): + d = self._dischargers.get(loc) + if d is None: + return None + return macaroonbakery.ThirdPartyInfo( + public_key=d._key.public_key, + version=macaroonbakery.LATEST_BAKERY_VERSION, + ) + + def __setitem__(self, key, item): + self._dischargers[key] = item + + def __getitem__(self, key): + return self._dischargers[key] + + def get(self, key): + return self._dischargers.get(key) + + +class _IdService(macaroonbakery.IdentityClient, + macaroonbakery.ThirdPartyCaveatChecker): + def __init__(self, location, locator, test_class): + self._location = location + self._test = test_class + key = macaroonbakery.generate_key() + self._discharger = _Discharger(key=key, checker=self, locator=locator) + locator[location] = self._discharger + + def check_third_party_caveat(self, ctx, info): + if info.condition != 'is-authenticated-user': + raise macaroonbakery.CaveatNotRecognizedError( + 'third party condition not ' + 'recognized') + + username = ctx.get(_DISCHARGE_USER_KEY, '') + if username == '': + return macaroonbakery.ThirdPartyCaveatCheckFailed( + 'no current user') + self._test._discharges.append( + _DischargeRecord(location=self._location, user=username)) + return [checkers.declared_caveat('username', username)] + + def identity_from_context(self, ctx): + return None, [checkers.Caveat(location=self._location, + condition='is-authenticated-user')] + + def declared_identity(self, ctx, declared): + user = declared.get('username') + if user is None: + raise macaroonbakery.IdentityError('no username declared') + return macaroonbakery.SimpleIdentity(user) + + +_DISCHARGE_USER_KEY = checkers.ContextKey('user-key') + +_DischargeRecord = namedtuple('_DISCHARGE_RECORD', ['location', 'user']) + + +class _Discharger(object): + ''' utility class that has a discharge function with the same signature of + get_discharge for discharge_all. + ''' + + def __init__(self, key, locator, checker): + self._key = key + self._locator = locator + self._checker = checker + + def discharge(self, ctx, cav, payload): + return macaroonbakery.discharge(ctx, key=self._key, id=cav.caveat_id, + caveat=payload, + checker=self._checker, + locator=self._locator) + + +class _OpAuthorizer(macaroonbakery.Authorizer): + '''Implements bakery.Authorizer by looking the operation + up in the given map. If the username is in the associated list + or the list contains "everyone", authorization is granted. + ''' + + def __init__(self, auth=None): + if auth is None: + auth = {} + self._auth = auth + + def authorize(self, ctx, id, ops): + return macaroonbakery.ACLAuthorizer( + allow_public=True, + get_acl=lambda ctx, op: self._auth.get(op, [])).authorize( + ctx, id, ops) + + +class _MacaroonStore(object): + ''' Stores root keys in memory and puts all operations in the macaroon id. + ''' + + def __init__(self, key, locator): + self._root_key_store = macaroonbakery.MemoryKeyStore() + self._key = key + self._locator = locator + + def new_macaroon(self, caveats, namespace, ops): + root_key, id = self._root_key_store.root_key() + m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops} + data = json.dumps(m_id) + m = macaroonbakery.Macaroon( + root_key=root_key, id=data, location='', + version=macaroonbakery.LATEST_BAKERY_VERSION, + namespace=namespace) + m.add_caveats(caveats, self._key, self._locator) + return m + + def macaroon_ops(self, ms): + if len(ms) == 0: + raise ValueError('no macaroons provided') + + m_id = json.loads(ms[0].identifier_bytes.decode('utf-8')) + root_key = self._root_key_store.get( + base64.urlsafe_b64decode(m_id['id'].encode('utf-8'))) + + v = Verifier() + + class NoValidationOnFirstPartyCaveat(FirstPartyCaveatVerifierDelegate): + def verify_first_party_caveat(self, verifier, caveat, signature): + return True + + v.first_party_caveat_verifier_delegate = \ + NoValidationOnFirstPartyCaveat() + ok = v.verify(macaroon=ms[0], key=root_key, + discharge_macaroons=ms[1:]) + if not ok: + raise macaroonbakery.VerificationError('invalid signature') + conditions = [] + for m in ms: + cavs = m.first_party_caveats() + for cav in cavs: + conditions.append(cav.caveat_id_bytes.decode('utf-8')) + ops = [] + for op in m_id['ops']: + ops.append(macaroonbakery.Op(entity=op[0], action=op[1])) + return ops, conditions + + +class _Service(object): + '''Represents a service that requires authorization. + + Clients can make requests to the service to perform operations + and may receive a macaroon to discharge if the authorization + process requires it. + ''' + + def __init__(self, name, auth, idm, locator): + self._name = name + self._store = _MacaroonStore(macaroonbakery.generate_key(), locator) + self._checker = macaroonbakery.Checker( + checker=test_checker(), + authorizer=auth, + identity_client=idm, + macaroon_opstore=self._store) + + def name(self): + return self._name + + def do(self, ctx, ms, ops): + try: + authInfo = self._checker.auth(ms).allow(ctx, ops) + except macaroonbakery.DischargeRequiredError as exc: + self._discharge_required_error(exc) + return authInfo + + def do_any(self, ctx, ms, ops): + # makes a request to the service to perform any of the given + # operations. It reports which operations have succeeded. + try: + authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops) + return authInfo, allowed + except macaroonbakery.DischargeRequiredError as exc: + self._discharge_required_error(exc) + + def capability(self, ctx, ms, ops): + try: + conds = self._checker.auth(ms).allow_capability(ctx, ops) + except macaroonbakery.DischargeRequiredError as exc: + self._discharge_required_error(exc) + + m = self._store.new_macaroon(None, self._checker.namespace(), ops) + for cond in conds: + m.macaroon.add_first_party_caveat(cond) + return m + + def _discharge_required_error(self, err): + m = self._store.new_macaroon(err.cavs(), self._checker.namespace(), + err.ops()) + name = 'authz' + if len(err.ops()) == 1 and err.ops()[0] == macaroonbakery.LOGIN_OP: + name = 'authn' + raise _DischargeRequiredError(name=name, m=m) + + +class _DischargeRequiredError(Exception): + def __init__(self, name, m): + Exception.__init__(self, 'discharge required') + self._name = name + self._m = m + + def m(self): + return self._m + + def name(self): + return self._name + + +class _Client(object): + max_retries = 3 + + def __init__(self, dischargers): + self._key = macaroonbakery.generate_key() + self._macaroons = {} + self._dischargers = dischargers + + def do(self, ctx, svc, ops): + class _AuthInfo: + authInfo = None + + def svc_do(ms): + _AuthInfo.authInfo = svc.do(ctx, ms, ops) + + self._do_func(ctx, svc, svc_do) + return _AuthInfo.authInfo + + def do_any(self, ctx, svc, ops): + return svc.do_any(ctx, self._request_macaroons(svc), ops) + + def capability(self, ctx, svc, ops): + # capability returns a capability macaroon for the given operations. + + class _M: + m = None + + def svc_capability(ms): + _M.m = svc.capability(ctx, ms, ops) + return + + self._do_func(ctx, svc, svc_capability) + return _M.m + + def discharged_capability(self, ctx, svc, ops): + m = self.capability(ctx, svc, ops) + return self._discharge_all(ctx, m) + + def _do_func(self, ctx, svc, f): + for i in range(0, self.max_retries): + try: + f(self._request_macaroons(svc)) + return + except _DischargeRequiredError as exc: + ms = self._discharge_all(ctx, exc.m()) + self.add_macaroon(svc, exc.name(), ms) + raise ValueError('discharge failed too many times') + + def _clear_macaroons(self, svc): + if svc is None: + self._macaroons = {} + return + if svc.name() in self._macaroons: + del self._macaroons[svc.name()] + + def add_macaroon(self, svc, name, m): + if svc.name() not in self._macaroons: + self._macaroons[svc.name()] = {} + self._macaroons[svc.name()][name] = m + + def _request_macaroons(self, svc): + mmap = self._macaroons.get(svc.name(), []) + # Put all the macaroons in the slice ordered by key + # so that we have deterministic behaviour in the tests. + names = [] + for name in mmap: + names.append(name) + names = sorted(names) + ms = [None] * len(names) + for i, name in enumerate(names): + ms[i] = mmap[name] + return ms + + def _discharge_all(self, ctx, m): + def get_discharge(ctx, cav, pay_load): + d = self._dischargers.get(cav.location) + if d is None: + raise ValueError('third party discharger ' + '{} not found'.format(cav.location)) + return d.discharge(ctx, cav, pay_load) + + return macaroonbakery.discharge_all(ctx, m, get_discharge) + + +class _BasicAuthIdService(macaroonbakery.IdentityClient): + def identity_from_context(self, ctx): + user, pwd = _basic_auth_from_context(ctx) + if user != 'sherlock' or pwd != 'holmes': + return None, None + return macaroonbakery.SimpleIdentity(user), None + + def declared_identity(self, ctx, declared): + raise macaroonbakery.IdentityError('no identity declarations in basic ' + 'auth id service') + + +_BASIC_AUTH_KEY = checkers.ContextKey('user-key') + + +class _BasicAuth(object): + def __init__(self, user, password): + self.user = user + self.password = password + + +def _context_with_basic_auth(ctx, user, password): + return ctx.with_value(_BASIC_AUTH_KEY, _BasicAuth(user, password)) + + +def _basic_auth_from_context(ctx): + auth = ctx.get(_BASIC_AUTH_KEY, _BasicAuth('', '')) + return auth.user, auth.password + + +def _macaroon_conditions(caveats, allow_third): + conds = [''] * len(caveats) + for i, cav in enumerate(caveats): + if cav.location is not None and cav.location != '': + if not allow_third: + raise ValueError('found unexpected third party caveat:' + ' {}'.format(cav.location)) + continue + conds[i] = cav.caveat_id.decode('utf-8') + return conds + + +def _resolve_caveats(ns, caveats): + conds = [''] * len(caveats) + for i, cav in enumerate(caveats): + if cav.location is not None and cav.location != '': + raise ValueError('found unexpected third party caveat') + conds[i] = ns.resolve_caveat(cav).condition + return conds + + +class _MacaroonStoreWithError(object): + def new_macaroon(self, caveats, ns, ops): + raise ValueError('some error') + + def macaroon_ops(self, ms): + raise ValueError('some error') diff --git a/macaroonbakery/tests/test_checkers.py b/macaroonbakery/tests/test_checkers.py new file mode 100644 index 0000000..f552fa4 --- /dev/null +++ b/macaroonbakery/tests/test_checkers.py @@ -0,0 +1,356 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +from datetime import datetime, timedelta +from unittest import TestCase + +import six +import pytz +from pymacaroons import Macaroon, MACAROON_V2 + +import macaroonbakery.checkers as checkers + +# A frozen time for the tests. +NOW = datetime( + year=2006, month=1, day=2, hour=15, minute=4, second=5, microsecond=123) + + +class TestClock(): + def utcnow(self): + return pytz.UTC.localize(NOW) + + +class TestCheckers(TestCase): + def test_checkers(self): + + tests = [ + ('nothing in context, no extra checkers', [ + ('something', + 'caveat "something" not satisfied: caveat not recognized'), + ('', 'cannot parse caveat "": empty caveat'), + (' hello', 'cannot parse caveat " hello": caveat starts with' + ' space character'), + ], None), + ('one failed caveat', [ + ('t:a aval', None), + ('t:b bval', None), + ('t:a wrong', 'caveat "t:a wrong" not satisfied: wrong arg'), + ], None), + ('time from clock', [ + (checkers.time_before_caveat( + datetime.utcnow() + + timedelta(0, 1)).condition, + None), + (checkers.time_before_caveat(NOW).condition, + 'caveat "time-before 2006-01-02T15:04:05.000123Z" ' + 'not satisfied: macaroon has expired'), + (checkers.time_before_caveat(NOW - timedelta(0, 1)).condition, + 'caveat "time-before 2006-01-02T15:04:04.000123Z" ' + 'not satisfied: macaroon has expired'), + ('time-before bad-date', + 'caveat "time-before bad-date" not satisfied: ' + 'cannot parse "bad-date" as RFC 3339'), + (checkers.time_before_caveat(NOW).condition + " ", + 'caveat "time-before 2006-01-02T15:04:05.000123Z " ' + 'not satisfied: ' + 'cannot parse "2006-01-02T15:04:05.000123Z " as RFC 3339'), + ], lambda x: checkers.context_with_clock(ctx, TestClock())), + ('real time', [ + (checkers.time_before_caveat(datetime( + year=2010, month=1, day=1)).condition, + 'caveat "time-before 2010-01-01T00:00:00.000000Z" not ' + 'satisfied: macaroon has expired'), + (checkers.time_before_caveat(datetime( + year=3000, month=1, day=1)).condition, None), + ], None), + ('declared, no entries', [ + (checkers.declared_caveat('a', 'aval').condition, + 'caveat "declared a aval" not satisfied: got a=null, ' + 'expected "aval"'), + (checkers.COND_DECLARED, 'caveat "declared" not satisfied: ' + 'declared caveat has no value'), + ], None), + ('declared, some entries', [ + (checkers.declared_caveat('a', 'aval').condition, None), + (checkers.declared_caveat('b', 'bval').condition, None), + (checkers.declared_caveat('spc', ' a b').condition, None), + (checkers.declared_caveat('a', 'bval').condition, + 'caveat "declared a bval" not satisfied: ' + 'got a="aval", expected "bval"'), + (checkers.declared_caveat('a', ' aval').condition, + 'caveat "declared a aval" not satisfied: ' + 'got a="aval", expected " aval"'), + (checkers.declared_caveat('spc', 'a b').condition, + 'caveat "declared spc a b" not satisfied: ' + 'got spc=" a b", expected "a b"'), + (checkers.declared_caveat('', 'a b').condition, + 'caveat "error invalid caveat \'declared\' key """ ' + 'not satisfied: bad caveat'), + (checkers.declared_caveat('a b', 'a b').condition, + 'caveat "error invalid caveat \'declared\' key "a b"" ' + 'not satisfied: bad caveat'), + ], lambda x: checkers.context_with_declared(x, { + 'a': 'aval', + 'b': 'bval', + 'spc': ' a b'})), + ] + checker = checkers.Checker() + checker.namespace().register('testns', 't') + checker.register('a', 'testns', arg_checker(self, 't:a', 'aval')) + checker.register('b', 'testns', arg_checker(self, 't:b', 'bval')) + ctx = checkers.AuthContext() + for test in tests: + print(test[0]) + if test[2] is not None: + ctx1 = test[2](ctx) + else: + ctx1 = ctx + for check in test[1]: + err = checker.check_first_party_caveat(ctx1, check[0]) + if check[1] is not None: + self.assertEqual(err, check[1]) + else: + self.assertIsNone(err) + + def test_infer_declared(self): + tests = [ + ('no macaroons', [], {}, None), + ('single macaroon with one declaration', [ + [checkers.Caveat(condition='declared foo bar')] + ], {'foo': 'bar'}, None), + ('only one argument to declared', [ + [checkers.Caveat(condition='declared foo')] + ], {}, None), + ('spaces in value', [ + [checkers.Caveat(condition='declared foo bar bloggs')] + ], {'foo': 'bar bloggs'}, None), + ('attribute with declared prefix', [ + [checkers.Caveat(condition='declaredccf foo')] + ], {}, None), + ('several macaroons with different declares', [ + [ + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('b', 'bval') + ], [ + checkers.declared_caveat('c', 'cval'), + checkers.declared_caveat('d', 'dval') + ] + ], {'a': 'aval', 'b': 'bval', 'c': 'cval', 'd': 'dval'}, None), + ('duplicate values', [ + [ + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('b', 'bval') + ], [ + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('b', 'bval'), + checkers.declared_caveat('c', 'cval'), + checkers.declared_caveat('d', 'dval') + ] + ], {'a': 'aval', 'b': 'bval', 'c': 'cval', 'd': 'dval'}, None), + ('conflicting values', [ + [ + checkers.declared_caveat('a', 'aval'), + checkers.declared_caveat('a', 'conflict'), + checkers.declared_caveat('b', 'bval') + ], [ + checkers.declared_caveat('a', 'conflict'), + checkers.declared_caveat('b', 'another conflict'), + checkers.declared_caveat('c', 'cval'), + checkers.declared_caveat('d', 'dval') + ] + ], {'c': 'cval', 'd': 'dval'}, None), + ('third party caveats ignored', [ + [checkers.Caveat(condition='declared a no conflict', + location='location')], + [checkers.declared_caveat('a', 'aval')] + ], {'a': 'aval'}, None), + ('unparseable caveats ignored', [ + [checkers.Caveat(condition=' bad')], + [checkers.declared_caveat('a', 'aval')] + ], {'a': 'aval'}, None), + ('infer with namespace', [ + [ + checkers.declared_caveat('a', 'aval'), + caveat_with_ns(checkers.declared_caveat('a', 'aval'), + 'testns'), + ] + ], {'a': 'aval'}, None), + ] + for test in tests: + uri_to_prefix = test[3] + if uri_to_prefix is None: + uri_to_prefix = {checkers.STD_NAMESPACE: ''} + ns = checkers.Namespace(uri_to_prefix) + print(test[0]) + ms = [] + for i, caveats in enumerate(test[1]): + m = Macaroon(key=None, identifier=six.int2byte(i), location='', + version=MACAROON_V2) + for cav in caveats: + cav = ns.resolve_caveat(cav) + if cav.location == '': + m.add_first_party_caveat(cav.condition) + else: + m.add_third_party_caveat(cav.location, None, + cav.condition) + ms.append(m) + self.assertEqual(checkers.infer_declared(ms), test[2]) + + def test_operations_checker(self): + tests = [ + ('all allowed', checkers.allow_caveat( + ['op1', 'op2', 'op4', 'op3']), + ['op1', 'op3', 'op2'], None), + ('none denied', checkers.deny_caveat(['op1', 'op2']), + ['op3', 'op4'], None), + ('one not allowed', checkers.allow_caveat(['op1', 'op2']), + ['op1', 'op3'], + 'caveat "allow op1 op2" not satisfied: op3 not allowed'), + ('one not denied', checkers.deny_caveat(['op1', 'op2']), + ['op4', 'op5', 'op2'], + 'caveat "deny op1 op2" not satisfied: op2 not allowed'), + ('no operations, allow caveat', checkers.allow_caveat(['op1']), + [], + 'caveat "allow op1" not satisfied: op1 not allowed'), + ('no operations, deny caveat', checkers.deny_caveat(['op1']), + [], None), + ('no operations, empty allow caveat', checkers.Caveat( + condition=checkers.COND_ALLOW), + [], 'caveat "allow" not satisfied: no operations allowed'), + ] + checker = checkers.Checker() + for test in tests: + print(test[0]) + ctx = checkers.context_with_operations(checkers.AuthContext(), + test[2]) + err = checker.check_first_party_caveat(ctx, test[1].condition) + if test[3] is None: + self.assertIsNone(err) + continue + self.assertEqual(err, test[3]) + + def test_operation_error_caveat(self): + tests = [ + ('empty allow', checkers.allow_caveat(None), + 'error no operations allowed'), + ('allow: invalid operation name', + checkers.allow_caveat(['op1', 'operation number 2']), + 'error invalid operation name "operation number 2"'), + ('deny: invalid operation name', + checkers.deny_caveat(['op1', 'operation number 2']), + 'error invalid operation name "operation number 2"') + ] + for test in tests: + print(test[0]) + self.assertEqual(test[1].condition, test[2]) + + def test_register_none_func_raise_exception(self): + checker = checkers.Checker() + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x', checkers.STD_NAMESPACE, None) + self.assertEqual(ctx.exception.args[0], + 'no check function registered for namespace std when ' + 'registering condition x') + + def test_register_no_registered_ns_exception(self): + checker = checkers.Checker() + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x', 'testns', lambda x: None) + self.assertEqual(ctx.exception.args[0], + 'no prefix registered for namespace testns when ' + 'registering condition x') + + def test_register_empty_prefix_condition_with_colon(self): + checker = checkers.Checker() + checker.namespace().register('testns', '') + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x:y', 'testns', lambda x: None) + self.assertEqual(ctx.exception.args[0], + 'caveat condition x:y in namespace testns contains a ' + 'colon but its prefix is empty') + + def test_register_twice_same_namespace(self): + checker = checkers.Checker() + checker.namespace().register('testns', '') + checker.register('x', 'testns', lambda x: None) + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x', 'testns', lambda x: None) + self.assertEqual(ctx.exception.args[0], + 'checker for x (namespace testns) already registered' + ' in namespace testns') + + def test_register_twice_different_namespace(self): + checker = checkers.Checker() + checker.namespace().register('testns', '') + checker.namespace().register('otherns', '') + checker.register('x', 'testns', lambda x: None) + with self.assertRaises(checkers.RegisterError) as ctx: + checker.register('x', 'otherns', lambda x: None) + self.assertEqual(ctx.exception.args[0], + 'checker for x (namespace otherns) already registered' + ' in namespace testns') + + def test_checker_info(self): + checker = checkers.Checker(include_std_checkers=False) + checker.namespace().register('one', 't') + checker.namespace().register('two', 't') + checker.namespace().register('three', '') + checker.namespace().register('four', 's') + + class Called(object): + val = '' + + def register(name, ns): + def func(ctx, cond, arg): + Called.val = name + ' ' + ns + return None + + checker.register(name, ns, func) + + register('x', 'one') + register('y', 'one') + register('z', 'two') + register('a', 'two') + register('something', 'three') + register('other', 'three') + register('xxx', 'four') + + expect = [ + checkers.CheckerInfo(ns='four', name='xxx', prefix='s'), + checkers.CheckerInfo(ns='one', name='x', prefix='t'), + checkers.CheckerInfo(ns='one', name='y', prefix='t'), + checkers.CheckerInfo(ns='three', name='other', prefix=''), + checkers.CheckerInfo(ns='three', name='something', prefix=''), + checkers.CheckerInfo(ns='two', name='a', prefix='t'), + checkers.CheckerInfo(ns='two', name='z', prefix='t'), + ] + infos = checker.info() + self.assertEqual(len(infos), len(expect)) + new_infos = [] + for i, info in enumerate(infos): + Called.val = '' + info.check(None, '', '') + self.assertEqual(Called.val, expect[i].name + ' ' + + expect[i].ns) + new_infos.append(checkers.CheckerInfo(ns=info.ns, name=info.name, + prefix=info.prefix)) + self.assertEqual(new_infos, expect) + + +def caveat_with_ns(cav, ns): + return checkers.Caveat(location=cav.location, condition=cav.condition, + namespace=ns) + + +def arg_checker(test, expect_cond, check_arg): + ''' Returns a checker function that checks that the caveat condition is + check_arg. + ''' + + def func(ctx, cond, arg): + test.assertEqual(cond, expect_cond) + if arg != check_arg: + return 'wrong arg' + return None + + return func diff --git a/macaroonbakery/tests/test_codec.py b/macaroonbakery/tests/test_codec.py index de1631c..6573266 100644 --- a/macaroonbakery/tests/test_codec.py +++ b/macaroonbakery/tests/test_codec.py @@ -1,95 +1,103 @@ # Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. +import base64 from unittest import TestCase -import base64 +import nacl.public import six -import nacl.utils -from nacl.public import PrivateKey -from nacl.encoding import Base64Encoder - -from macaroonbakery import bakery, codec, macaroon, namespace, utils +import macaroonbakery +from macaroonbakery import utils +from macaroonbakery import codec +import macaroonbakery.checkers as checkers class TestCodec(TestCase): def setUp(self): - self.fp_key = nacl.public.PrivateKey.generate() - self.tp_key = nacl.public.PrivateKey.generate() + self.fp_key = macaroonbakery.generate_key() + self.tp_key = macaroonbakery.generate_key() def test_v1_round_trip(self): - tp_info = bakery.ThirdPartyInfo(bakery.BAKERY_V1, - self.tp_key.public_key) - cid = codec.encode_caveat('is-authenticated-user', - b'a random string', - tp_info, - self.fp_key, - None) - - res = codec.decode_caveat(self.tp_key, cid) - self.assertEquals(res, macaroon.ThirdPartyCaveatInfo( + tp_info = macaroonbakery.ThirdPartyInfo( + version=macaroonbakery.BAKERY_V1, + public_key=self.tp_key.public_key) + cid = macaroonbakery.encode_caveat( + 'is-authenticated-user', + b'a random string', + tp_info, + self.fp_key, + None) + res = macaroonbakery.decode_caveat(self.tp_key, cid) + self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo( first_party_public_key=self.fp_key.public_key, root_key=b'a random string', condition='is-authenticated-user', caveat=cid, third_party_key_pair=self.tp_key, - version=bakery.BAKERY_V1, - ns=macaroon.legacy_namespace() + version=macaroonbakery.BAKERY_V1, + namespace=macaroonbakery.legacy_namespace() )) def test_v2_round_trip(self): - tp_info = bakery.ThirdPartyInfo(bakery.BAKERY_V2, - self.tp_key.public_key) - cid = codec.encode_caveat('is-authenticated-user', - b'a random string', - tp_info, - self.fp_key, - None) - res = codec.decode_caveat(self.tp_key, cid) - self.assertEquals(res, macaroon.ThirdPartyCaveatInfo( + tp_info = macaroonbakery.ThirdPartyInfo( + version=macaroonbakery.BAKERY_V2, + public_key=self.tp_key.public_key) + cid = macaroonbakery.encode_caveat( + 'is-authenticated-user', + b'a random string', + tp_info, + self.fp_key, + None) + res = macaroonbakery.decode_caveat(self.tp_key, cid) + self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo( first_party_public_key=self.fp_key.public_key, root_key=b'a random string', condition='is-authenticated-user', caveat=cid, third_party_key_pair=self.tp_key, - version=bakery.BAKERY_V2, - ns=macaroon.legacy_namespace() + version=macaroonbakery.BAKERY_V2, + namespace=macaroonbakery.legacy_namespace() )) def test_v3_round_trip(self): - tp_info = bakery.ThirdPartyInfo(bakery.BAKERY_V3, - self.tp_key.public_key) - ns = namespace.Namespace() + tp_info = macaroonbakery.ThirdPartyInfo( + version=macaroonbakery.BAKERY_V3, + public_key=self.tp_key.public_key) + ns = checkers.Namespace() ns.register('testns', 'x') - cid = codec.encode_caveat('is-authenticated-user', - b'a random string', - tp_info, - self.fp_key, - ns) - res = codec.decode_caveat(self.tp_key, cid) - self.assertEquals(res, macaroon.ThirdPartyCaveatInfo( + cid = macaroonbakery.encode_caveat( + 'is-authenticated-user', + b'a random string', + tp_info, + self.fp_key, + ns) + res = macaroonbakery.decode_caveat(self.tp_key, cid) + self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo( first_party_public_key=self.fp_key.public_key, root_key=b'a random string', condition='is-authenticated-user', caveat=cid, third_party_key_pair=self.tp_key, - version=bakery.BAKERY_V3, - ns=ns + version=macaroonbakery.BAKERY_V3, + namespace=ns )) def test_empty_caveat_id(self): - with self.assertRaises(ValueError) as context: - codec.decode_caveat(self.tp_key, b'') + with self.assertRaises(macaroonbakery.VerificationError) as context: + macaroonbakery.decode_caveat(self.tp_key, b'') self.assertTrue('empty third party caveat' in str(context.exception)) def test_decode_caveat_v1_from_go(self): - tp_key = PrivateKey(base64.b64decode( - 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')) - fp_key = PrivateKey(base64.b64decode( - 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')) - fp_key.encode(Base64Encoder) + tp_key = macaroonbakery.PrivateKey( + nacl.public.PrivateKey(base64.b64decode( + 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))) + fp_key = macaroonbakery.PrivateKey( + nacl.public.PrivateKey(base64.b64decode( + 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))) + root_key = base64.b64decode('vDxEmWZEkgiNEFlJ+8ruXe3qDSLf1H+o') # This caveat has been generated from the go code # to check the compatibilty + encrypted_cav = six.b( 'eyJUaGlyZFBhcnR5UHVibGljS2V5IjoiOFA3R1ZZc3BlWlN4c' '3hFdmJsSVFFSTFqdTBTSWl0WlIrRFdhWE40cmxocz0iLCJGaX' @@ -100,22 +108,25 @@ class TestCodec(TestCase): 'BORldUUExGdjVla1dWUjA4Uk1sbGJhc3c4VGdFbkhzM0laeVo' '0V2lEOHhRUWdjU3ljOHY4eUt4dEhxejVEczJOYmh1ZDJhUFdt' 'UTVMcVlNWitmZ2FNaTAxdE9DIn0=') - cav = codec.decode_caveat(tp_key, encrypted_cav) - self.assertEquals(cav, macaroon.ThirdPartyCaveatInfo( + cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav) + self.assertEquals(cav, macaroonbakery.ThirdPartyCaveatInfo( condition='caveat condition', first_party_public_key=fp_key.public_key, third_party_key_pair=tp_key, - root_key=b'random', + root_key=root_key, caveat=encrypted_cav, - version=bakery.BAKERY_V1, - ns=macaroon.legacy_namespace() + version=macaroonbakery.BAKERY_V1, + namespace=macaroonbakery.legacy_namespace() )) def test_decode_caveat_v2_from_go(self): - tp_key = PrivateKey(base64.b64decode( - 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')) - fp_key = PrivateKey(base64.b64decode( - 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')) + tp_key = macaroonbakery.PrivateKey(nacl.public.PrivateKey( + base64.b64decode( + 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))) + fp_key = macaroonbakery.PrivateKey( + nacl.public.PrivateKey(base64.b64decode( + 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))) + root_key = base64.b64decode('wh0HSM65wWHOIxoGjgJJOFvQKn2jJFhC') # This caveat has been generated from the go code # to check the compatibilty encrypted_cav = base64.urlsafe_b64decode( @@ -123,22 +134,25 @@ class TestCodec(TestCase): 'AvD-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGIHq9xGcHS9IZ' 'Lh0cL6D9qpeKI0mXmCPfnwRQDuVYC8y5gVWd-oCGZaj5TGtk3byp2Vnw6ojmt' 'sULDhY59YA_J_Y0ATkERO5T9ajoRWBxU2OXBoX6bImXA'))) - cav = codec.decode_caveat(tp_key, encrypted_cav) - self.assertEquals(cav, macaroon.ThirdPartyCaveatInfo( + cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav) + self.assertEqual(cav, macaroonbakery.ThirdPartyCaveatInfo( condition='third party condition', first_party_public_key=fp_key.public_key, third_party_key_pair=tp_key, - root_key=b'random', + root_key=root_key, caveat=encrypted_cav, - version=bakery.BAKERY_V2, - ns=macaroon.legacy_namespace() + version=macaroonbakery.BAKERY_V2, + namespace=macaroonbakery.legacy_namespace() )) def test_decode_caveat_v3_from_go(self): - tp_key = PrivateKey(base64.b64decode( - 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')) - fp_key = PrivateKey(base64.b64decode( - 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')) + tp_key = macaroonbakery.PrivateKey( + nacl.public.PrivateKey(base64.b64decode( + 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))) + fp_key = macaroonbakery.PrivateKey(nacl.public.PrivateKey( + base64.b64decode( + 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))) + root_key = base64.b64decode(b'oqOXI3/Mz/pKjCuFOt2eYxb7ndLq66GY') # This caveat has been generated from the go code # to check the compatibilty encrypted_cav = base64.urlsafe_b64decode( @@ -146,15 +160,15 @@ class TestCodec(TestCase): 'A_D-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGNLeFSkD2M-8A' 'EYvmgVH95GWu7T7caKxKhhOQFcEKgnXKJvYXxz1zin4cZc4Q6C7gVqA-J4_j3' '1LX4VKxymqG62UGPo78wOv0_fKjr3OI6PPJOYOQgBMclemlRF2'))) - cav = codec.decode_caveat(tp_key, encrypted_cav) - self.assertEquals(cav, macaroon.ThirdPartyCaveatInfo( + cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav) + self.assertEquals(cav, macaroonbakery.ThirdPartyCaveatInfo( condition='third party condition', first_party_public_key=fp_key.public_key, third_party_key_pair=tp_key, - root_key=b'random', + root_key=root_key, caveat=encrypted_cav, - version=bakery.BAKERY_V3, - ns=macaroon.legacy_namespace() + version=macaroonbakery.BAKERY_V3, + namespace=macaroonbakery.legacy_namespace() )) def test_encode_decode_varint(self): @@ -169,10 +183,10 @@ class TestCodec(TestCase): for test in tests: data = bytearray() expected = bytearray() - codec._encode_uvarint(test[0], data) + macaroonbakery.encode_uvarint(test[0], data) for v in test[1]: expected.append(v) self.assertEquals(data, expected) - val = codec._decode_uvarint(bytes(data)) + val = codec.decode_uvarint(bytes(data)) self.assertEquals(test[0], val[0]) self.assertEquals(len(test[1]), val[1]) diff --git a/macaroonbakery/tests/test_discharge.py b/macaroonbakery/tests/test_discharge.py new file mode 100644 index 0000000..6e2df6a --- /dev/null +++ b/macaroonbakery/tests/test_discharge.py @@ -0,0 +1,445 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +import unittest + +from pymacaroons import MACAROON_V1, Macaroon +from pymacaroons.exceptions import ( + MacaroonInvalidSignatureException, MacaroonUnmetCaveatException +) + +import macaroonbakery +import macaroonbakery.checkers as checkers +from macaroonbakery.tests import common + + +class TestDischarge(unittest.TestCase): + def test_single_service_first_party(self): + ''' Creates a single service with a macaroon with one first party + caveat. + It creates a request with this macaroon and checks that the service + can verify this macaroon as valid. + ''' + oc = common.new_bakery('bakerytest') + primary = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + common.ages, None, + [macaroonbakery.LOGIN_OP]) + self.assertEqual(primary.macaroon.location, 'bakerytest') + primary.add_caveat(checkers.Caveat(condition='str something', + namespace='testns'), + oc.oven.key, oc.oven.locator) + oc.checker.auth([[primary.macaroon]]).allow( + common.str_context('something'), [macaroonbakery.LOGIN_OP]) + + def test_macaroon_paper_fig6(self): + ''' Implements an example flow as described in the macaroons paper: + http://theory.stanford.edu/~ataly/Papers/macaroons.pdf + There are three services, ts, fs, bs: + ts is a store service which has deligated authority to a forum + service fs. + The forum service wants to require its users to be logged into to an + authentication service bs. + + The client obtains a macaroon from fs (minted by ts, with a third party + caveat addressed to bs). + The client obtains a discharge macaroon from bs to satisfy this caveat. + The target service verifies the original macaroon it delegated to fs + No direct contact between bs and ts is required + ''' + locator = macaroonbakery.ThirdPartyStore() + bs = common.new_bakery('bs-loc', locator) + ts = common.new_bakery('ts-loc', locator) + fs = common.new_bakery('fs-loc', locator) + + # ts creates a macaroon. + ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + common.ages, + None, [macaroonbakery.LOGIN_OP]) + + # ts somehow sends the macaroon to fs which adds a third party caveat + # to be discharged by bs. + ts_macaroon.add_caveat(checkers.Caveat(location='bs-loc', + condition='user==bob'), + fs.oven.key, fs.oven.locator) + + # client asks for a discharge macaroon for each third party caveat + def get_discharge(ctx, cav, payload): + self.assertEqual(cav.location, 'bs-loc') + return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker( + 'user==bob'), + bs.oven.locator) + + d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, + get_discharge) + + ts.checker.auth([d]).allow(common.test_context, + [macaroonbakery.LOGIN_OP]) + + def test_discharge_with_version1_macaroon(self): + locator = macaroonbakery.ThirdPartyStore() + bs = common.new_bakery('bs-loc', locator) + ts = common.new_bakery('ts-loc', locator) + + # ts creates a old-version macaroon. + ts_macaroon = ts.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages, + None, [macaroonbakery.LOGIN_OP]) + ts_macaroon.add_caveat(checkers.Caveat(condition='something', + location='bs-loc'), + ts.oven.key, ts.oven.locator) + + # client asks for a discharge macaroon for each third party caveat + + def get_discharge(ctx, cav, payload): + # Make sure that the caveat id really is old-style. + try: + cav.caveat_id_bytes.decode('utf-8') + except UnicodeDecodeError: + self.fail('caveat id is not utf-8') + return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker( + 'something'), + bs.oven.locator) + + d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, + get_discharge) + + ts.checker.auth([d]).allow(common.test_context, + [macaroonbakery.LOGIN_OP]) + + for m in d: + self.assertEqual(m.version, MACAROON_V1) + + def test_version1_macaroon_id(self): + # In the version 1 bakery, macaroon ids were hex-encoded with a + # hyphenated UUID suffix. + root_key_store = macaroonbakery.MemoryKeyStore() + b = macaroonbakery.Bakery(root_key_store=root_key_store, + identity_client=common.OneIdentity()) + key, id = root_key_store.root_key() + root_key_store.get(id) + m = Macaroon(key=key, version=MACAROON_V1, location='', + identifier=id + b'-deadl00f') + b.checker.auth([[m]]).allow(common.test_context, + [macaroonbakery.LOGIN_OP]) + + def test_macaroon_paper_fig6_fails_without_discharges(self): + ''' Runs a similar test as test_macaroon_paper_fig6 without the client + discharging the third party caveats. + ''' + locator = macaroonbakery.ThirdPartyStore() + ts = common.new_bakery('ts-loc', locator) + fs = common.new_bakery('fs-loc', locator) + common.new_bakery('as-loc', locator) + + # ts creates a macaroon. + ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + common.ages, None, + [macaroonbakery.LOGIN_OP]) + + # ts somehow sends the macaroon to fs which adds a third party + # caveat to be discharged by as. + ts_macaroon.add_caveat(checkers.Caveat(location='as-loc', + condition='user==bob'), + fs.oven.key, fs.oven.locator) + + # client makes request to ts + try: + ts.checker.auth([[ts_macaroon.macaroon]]).allow( + common.test_context, + macaroonbakery.LOGIN_OP + ) + self.fail('macaroon unmet should be raised') + except MacaroonUnmetCaveatException: + pass + + def test_macaroon_paper_fig6_fails_with_binding_on_tampered_sig(self): + ''' Runs a similar test as test_macaroon_paper_fig6 with the discharge + macaroon binding being done on a tampered signature. + ''' + locator = macaroonbakery.ThirdPartyStore() + bs = common.new_bakery('bs-loc', locator) + ts = common.new_bakery('ts-loc', locator) + + # ts creates a macaroon. + ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + common.ages, None, + [macaroonbakery.LOGIN_OP]) + # ts somehow sends the macaroon to fs which adds a third party caveat + # to be discharged by as. + ts_macaroon.add_caveat(checkers.Caveat(condition='user==bob', + location='bs-loc'), + ts.oven.key, ts.oven.locator) + + # client asks for a discharge macaroon for each third party caveat + def get_discharge(ctx, cav, payload): + self.assertEqual(cav.location, 'bs-loc') + return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker( + 'user==bob'), + bs.oven.locator) + + d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, + get_discharge) + # client has all the discharge macaroons. For each discharge macaroon + # bind it to our ts_macaroon and add it to our request. + tampered_macaroon = Macaroon() + for i, dm in enumerate(d[1:]): + d[i + 1] = tampered_macaroon.prepare_for_request(dm) + + # client makes request to ts. + with self.assertRaises(MacaroonInvalidSignatureException) as exc: + ts.checker.auth([d]).allow(common.test_context, + macaroonbakery.LOGIN_OP) + self.assertEqual('Signatures do not match', exc.exception.args[0]) + + def test_need_declared(self): + locator = macaroonbakery.ThirdPartyStore() + first_party = common.new_bakery('first', locator) + third_party = common.new_bakery('third', locator) + + # firstParty mints a macaroon with a third-party caveat addressed + # to thirdParty with a need-declared caveat. + m = first_party.oven.macaroon( + macaroonbakery.LATEST_BAKERY_VERSION, common.ages, [ + checkers.need_declared_caveat( + checkers.Caveat(location='third', condition='something'), + ['foo', 'bar'] + ) + ], [macaroonbakery.LOGIN_OP]) + + # The client asks for a discharge macaroon for each third party caveat. + def get_discharge(ctx, cav, payload): + return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, + third_party.oven.key, + common.ThirdPartyStrcmpChecker( + 'something'), + third_party.oven.locator) + + d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + + # The required declared attributes should have been added + # to the discharge macaroons. + declared = checkers.infer_declared(d, first_party.checker.namespace()) + self.assertEqual(declared, { + 'foo': '', + 'bar': '', + }) + + # Make sure the macaroons actually check out correctly + # when provided with the declared checker. + ctx = checkers.context_with_declared(common.test_context, declared) + first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + + # Try again when the third party does add a required declaration. + + # The client asks for a discharge macaroon for each third party caveat. + def get_discharge(ctx, cav, payload): + checker = common.ThirdPartyCheckerWithCaveats([ + checkers.declared_caveat('foo', 'a'), + checkers.declared_caveat('arble', 'b') + ]) + return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, + third_party.oven.key, + checker, + third_party.oven.locator) + + d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + + # One attribute should have been added, the other was already there. + declared = checkers.infer_declared(d, first_party.checker.namespace()) + self.assertEqual(declared, { + 'foo': 'a', + 'bar': '', + 'arble': 'b', + }) + + ctx = checkers.context_with_declared(common.test_context, declared) + first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + + # Try again, but this time pretend a client is sneakily trying + # to add another 'declared' attribute to alter the declarations. + + def get_discharge(ctx, cav, payload): + checker = common.ThirdPartyCheckerWithCaveats([ + checkers.declared_caveat('foo', 'a'), + checkers.declared_caveat('arble', 'b'), + ]) + + # Sneaky client adds a first party caveat. + m = macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, + third_party.oven.key, checker, + third_party.oven.locator) + m.add_caveat(checkers.declared_caveat('foo', 'c'), None, None) + return m + + d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + + declared = checkers.infer_declared(d, first_party.checker.namespace()) + self.assertEqual(declared, { + 'bar': '', + 'arble': 'b', + }) + + with self.assertRaises(macaroonbakery.AuthInitError) as exc: + first_party.checker.auth([d]).allow(common.test_context, + macaroonbakery.LOGIN_OP) + self.assertEqual('cannot authorize login macaroon: caveat ' + '"declared foo a" not satisfied: got foo=null, ' + 'expected "a"', exc.exception.args[0]) + + def test_discharge_two_need_declared(self): + locator = macaroonbakery.ThirdPartyStore() + first_party = common.new_bakery('first', locator) + third_party = common.new_bakery('third', locator) + + # first_party mints a macaroon with two third party caveats + # with overlapping attributes. + m = first_party.oven.macaroon( + macaroonbakery.LATEST_BAKERY_VERSION, + common.ages, [ + checkers.need_declared_caveat( + checkers.Caveat(location='third', condition='x'), + ['foo', 'bar']), + checkers.need_declared_caveat( + checkers.Caveat(location='third', condition='y'), + ['bar', 'baz']), + ], [macaroonbakery.LOGIN_OP]) + + # The client asks for a discharge macaroon for each third party caveat. + # Since no declarations are added by the discharger, + + def get_discharge(ctx, cav, payload): + return macaroonbakery.discharge( + ctx, cav.caveat_id_bytes, payload, third_party.oven.key, + common.ThirdPartyCaveatCheckerEmpty(), + third_party.oven.locator) + + d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + declared = checkers.infer_declared(d, first_party.checker.namespace()) + self.assertEqual(declared, { + 'foo': '', + 'bar': '', + 'baz': '', + }) + ctx = checkers.context_with_declared(common.test_context, declared) + first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + + # If they return conflicting values, the discharge fails. + # The client asks for a discharge macaroon for each third party caveat. + # Since no declarations are added by the discharger, + class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(self, ctx, cav_info): + if cav_info.condition == b'x': + return [checkers.declared_caveat('foo', 'fooval1')] + if cav_info.condition == b'y': + return [ + checkers.declared_caveat('foo', 'fooval2'), + checkers.declared_caveat('baz', 'bazval') + ] + raise common.ThirdPartyCaveatCheckFailed('not matched') + + def get_discharge(ctx, cav, payload): + return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, + third_party.oven.key, + ThirdPartyCaveatCheckerF(), + third_party.oven.locator) + + d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + + declared = checkers.infer_declared(d, first_party.checker.namespace()) + self.assertEqual(declared, { + 'bar': '', + 'baz': 'bazval', + }) + with self.assertRaises(macaroonbakery.AuthInitError) as exc: + first_party.checker.auth([d]).allow(common.test_context, + macaroonbakery.LOGIN_OP) + self.assertEqual('cannot authorize login macaroon: caveat "declared ' + 'foo fooval1" not satisfied: got foo=null, expected ' + '"fooval1"', exc.exception.args[0]) + + def test_discharge_macaroon_cannot_be_used_as_normal_macaroon(self): + locator = macaroonbakery.ThirdPartyStore() + first_party = common.new_bakery('first', locator) + third_party = common.new_bakery('third', locator) + + # First party mints a macaroon with a 3rd party caveat. + m = first_party.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + common.ages, [ + checkers.Caveat(location='third', + condition='true')], + [macaroonbakery.LOGIN_OP]) + + # Acquire the discharge macaroon, but don't bind it to the original. + class M: + unbound = None + + def get_discharge(ctx, cav, payload): + m = macaroonbakery.discharge( + ctx, cav.caveat_id_bytes, payload, third_party.oven.key, + common.ThirdPartyStrcmpChecker('true'), + third_party.oven.locator) + M.unbound = m.macaroon.copy() + return m + + macaroonbakery.discharge_all(common.test_context, m, get_discharge) + self.assertIsNotNone(M.unbound) + + # Make sure it cannot be used as a normal macaroon in the third party. + with self.assertRaises(macaroonbakery.AuthInitError) as exc: + third_party.checker.auth([[M.unbound]]).allow( + common.test_context, [macaroonbakery.LOGIN_OP]) + self.assertEqual('no operations found in macaroon', + exc.exception.args[0]) + + def test_third_party_discharge_macaroon_ids_are_small(self): + locator = macaroonbakery.ThirdPartyStore() + bakeries = { + 'ts-loc': common.new_bakery('ts-loc', locator), + 'as1-loc': common.new_bakery('as1-loc', locator), + 'as2-loc': common.new_bakery('as2-loc', locator), + } + ts = bakeries['ts-loc'] + + ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + common.ages, + None, [macaroonbakery.LOGIN_OP]) + ts_macaroon.add_caveat(checkers.Caveat(condition='something', + location='as1-loc'), + ts.oven.key, ts.oven.locator) + + class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker): + def __init__(self, loc): + self._loc = loc + + def check_third_party_caveat(self, ctx, info): + if self._loc == 'as1-loc': + return [checkers.Caveat(condition='something', + location='as2-loc')] + if self._loc == 'as2-loc': + return [] + raise common.ThirdPartyCaveatCheckFailed( + 'unknown location {}'.format(self._loc)) + + def get_discharge(ctx, cav, payload): + oven = bakeries[cav.location].oven + return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, + oven.key, + ThirdPartyCaveatCheckerF( + cav.location), + oven.locator) + + d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, + get_discharge) + ts.checker.auth([d]).allow(common.test_context, + [macaroonbakery.LOGIN_OP]) + + for i, m in enumerate(d): + for j, cav in enumerate(m.caveats): + if (cav.verification_key_id is not None and + len(cav.caveat_id) > 3): + self.fail('caveat id on caveat {} of macaroon {} ' + 'is too big ({})'.format(j, i, cav.id)) diff --git a/macaroonbakery/tests/test_discharge_all.py b/macaroonbakery/tests/test_discharge_all.py new file mode 100644 index 0000000..8da8823 --- /dev/null +++ b/macaroonbakery/tests/test_discharge_all.py @@ -0,0 +1,170 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +import unittest + +from pymacaroons.verifier import Verifier + +import macaroonbakery +import macaroonbakery.checkers as checkers +from macaroonbakery.tests import common + + +def always_ok(predicate): + return True + + +class TestDischargeAll(unittest.TestCase): + def test_discharge_all_no_discharges(self): + root_key = b'root key' + m = macaroonbakery.Macaroon( + root_key=root_key, id=b'id0', location='loc0', + version=macaroonbakery.LATEST_BAKERY_VERSION, + namespace=common.test_checker().namespace()) + ms = macaroonbakery.discharge_all( + common.test_context, m, no_discharge(self)) + self.assertEqual(len(ms), 1) + self.assertEqual(ms[0], m.macaroon) + v = Verifier() + v.satisfy_general(always_ok) + v.verify(m.macaroon, root_key, None) + + def test_discharge_all_many_discharges(self): + root_key = b'root key' + m0 = macaroonbakery.Macaroon( + root_key=root_key, id=b'id0', location='loc0', + version=macaroonbakery.LATEST_BAKERY_VERSION) + + class State(object): + total_required = 40 + id = 1 + + def add_caveats(m): + for i in range(0, 1): + if State.total_required == 0: + break + cid = 'id{}'.format(State.id) + m.macaroon.add_third_party_caveat( + location='somewhere', + key='root key {}'.format(cid).encode('utf-8'), + key_id=cid.encode('utf-8')) + State.id += 1 + State.total_required -= 1 + + add_caveats(m0) + + def get_discharge(_, cav, payload): + self.assertEqual(payload, None) + m = macaroonbakery.Macaroon( + root_key='root key {}'.format( + cav.caveat_id.decode('utf-8')).encode('utf-8'), + id=cav.caveat_id, location='', + version=macaroonbakery.LATEST_BAKERY_VERSION) + + add_caveats(m) + return m + + ms = macaroonbakery.discharge_all( + common.test_context, m0, get_discharge) + + self.assertEqual(len(ms), 41) + + v = Verifier() + v.satisfy_general(always_ok) + v.verify(ms[0], root_key, ms[1:]) + + def test_discharge_all_many_discharges_with_real_third_party_caveats(self): + # This is the same flow as TestDischargeAllManyDischarges except that + # we're using actual third party caveats as added by + # Macaroon.add_caveat and we use a larger number of caveats + # so that caveat ids will need to get larger. + locator = macaroonbakery.ThirdPartyStore() + bakeries = {} + total_discharges_required = 40 + + class M: + bakery_id = 0 + still_required = total_discharges_required + + def add_bakery(): + M.bakery_id += 1 + loc = 'loc{}'.format(M.bakery_id) + bakeries[loc] = common.new_bakery(loc, locator) + return loc + + ts = common.new_bakery('ts-loc', locator) + + def checker(_, ci): + caveats = [] + if ci.condition != 'something': + self.fail('unexpected condition') + for i in range(0, 2): + if M.still_required <= 0: + break + caveats.append(checkers.Caveat(location=add_bakery(), + condition='something')) + M.still_required -= 1 + return caveats + + root_key = b'root key' + m0 = macaroonbakery.Macaroon( + root_key=root_key, id=b'id0', location='ts-loc', + version=macaroonbakery.LATEST_BAKERY_VERSION) + + m0.add_caveat(checkers. Caveat(location=add_bakery(), + condition='something'), + ts.oven.key, locator) + + # We've added a caveat (the first) so one less caveat is required. + M.still_required -= 1 + + class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(self, ctx, info): + return checker(ctx, info) + + def get_discharge(ctx, cav, payload): + return macaroonbakery.discharge( + ctx, cav.caveat_id, payload, + bakeries[cav.location].oven.key, + ThirdPartyCaveatCheckerF(), locator) + + ms = macaroonbakery.discharge_all(common.test_context, m0, + get_discharge) + + self.assertEqual(len(ms), total_discharges_required + 1) + + v = Verifier() + v.satisfy_general(always_ok) + v.verify(ms[0], root_key, ms[1:]) + + def test_discharge_all_local_discharge(self): + oc = common.new_bakery('ts', None) + client_key = macaroonbakery.generate_key() + m = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, common.ages, + [ + macaroonbakery.local_third_party_caveat( + client_key.public_key, + macaroonbakery.LATEST_BAKERY_VERSION) + ], [macaroonbakery.LOGIN_OP]) + ms = macaroonbakery.discharge_all( + common.test_context, m, no_discharge(self), client_key) + oc.checker.auth([ms]).allow(common.test_context, + [macaroonbakery.LOGIN_OP]) + + def test_discharge_all_local_discharge_version1(self): + oc = common.new_bakery('ts', None) + client_key = macaroonbakery.generate_key() + m = oc.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages, [ + macaroonbakery.local_third_party_caveat( + client_key.public_key, macaroonbakery.BAKERY_V1) + ], [macaroonbakery.LOGIN_OP]) + ms = macaroonbakery.discharge_all( + common.test_context, m, no_discharge(self), client_key) + oc.checker.auth([ms]).allow(common.test_context, + [macaroonbakery.LOGIN_OP]) + + +def no_discharge(test): + def get_discharge(ctx, cav, payload): + test.fail("get_discharge called unexpectedly") + + return get_discharge diff --git a/macaroonbakery/tests/test_keyring.py b/macaroonbakery/tests/test_keyring.py new file mode 100644 index 0000000..351b144 --- /dev/null +++ b/macaroonbakery/tests/test_keyring.py @@ -0,0 +1,111 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +import unittest + +from httmock import urlmatch, HTTMock + +import macaroonbakery +from macaroonbakery import httpbakery + + +class TestKeyRing(unittest.TestCase): + + def test_cache_fetch(self): + key = macaroonbakery.generate_key() + + @urlmatch(path='.*/discharge/info') + def discharge_info(url, request): + return { + 'status_code': 200, + 'content': { + 'Version': macaroonbakery.LATEST_BAKERY_VERSION, + 'PublicKey': key.public_key.encode().decode('utf-8') + } + } + + expectInfo = macaroonbakery.ThirdPartyInfo( + public_key=key.public_key, + version=macaroonbakery.LATEST_BAKERY_VERSION + ) + kr = httpbakery.ThirdPartyLocator(allow_insecure=True) + with HTTMock(discharge_info): + info = kr.third_party_info('http://0.1.2.3/') + self.assertEqual(info, expectInfo) + + def test_cache_norefetch(self): + key = macaroonbakery.generate_key() + + @urlmatch(path='.*/discharge/info') + def discharge_info(url, request): + return { + 'status_code': 200, + 'content': { + 'Version': macaroonbakery.LATEST_BAKERY_VERSION, + 'PublicKey': key.public_key.encode().decode('utf-8') + } + } + + expectInfo = macaroonbakery.ThirdPartyInfo( + public_key=key.public_key, + version=macaroonbakery.LATEST_BAKERY_VERSION + ) + kr = httpbakery.ThirdPartyLocator(allow_insecure=True) + with HTTMock(discharge_info): + info = kr.third_party_info('http://0.1.2.3/') + self.assertEqual(info, expectInfo) + info = kr.third_party_info('http://0.1.2.3/') + self.assertEqual(info, expectInfo) + + def test_cache_fetch_no_version(self): + key = macaroonbakery.generate_key() + + @urlmatch(path='.*/discharge/info') + def discharge_info(url, request): + return { + 'status_code': 200, + 'content': { + 'PublicKey': key.public_key.encode().decode('utf-8') + } + } + + expectInfo = macaroonbakery.ThirdPartyInfo( + public_key=key.public_key, + version=macaroonbakery.BAKERY_V1 + ) + kr = httpbakery.ThirdPartyLocator(allow_insecure=True) + with HTTMock(discharge_info): + info = kr.third_party_info('http://0.1.2.3/') + self.assertEqual(info, expectInfo) + + def test_allow_insecure(self): + kr = httpbakery.ThirdPartyLocator() + with self.assertRaises(macaroonbakery.error.ThirdPartyInfoNotFound): + kr.third_party_info('http://0.1.2.3/') + + def test_fallback(self): + key = macaroonbakery.generate_key() + + @urlmatch(path='.*/discharge/info') + def discharge_info(url, request): + return { + 'status_code': 404, + } + + @urlmatch(path='.*/publickey') + def public_key(url, request): + return { + 'status_code': 200, + 'content': { + 'PublicKey': key.public_key.encode().decode('utf-8') + } + } + + expectInfo = macaroonbakery.ThirdPartyInfo( + public_key=key.public_key, + version=macaroonbakery.BAKERY_V1 + ) + kr = httpbakery.ThirdPartyLocator(allow_insecure=True) + with HTTMock(discharge_info): + with HTTMock(public_key): + info = kr.third_party_info('http://0.1.2.3/') + self.assertEqual(info, expectInfo) diff --git a/macaroonbakery/tests/test_macaroon.py b/macaroonbakery/tests/test_macaroon.py index afc7d52..7e77e2b 100644 --- a/macaroonbakery/tests/test_macaroon.py +++ b/macaroonbakery/tests/test_macaroon.py @@ -1,64 +1,202 @@ # Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. - +import json from unittest import TestCase import six +import pymacaroons +from pymacaroons import serializers -import nacl.utils - -from macaroonbakery import bakery, macaroon, checkers, codec +import macaroonbakery +import macaroonbakery.checkers as checkers +from macaroonbakery.tests import common class TestMacaroon(TestCase): def test_new_macaroon(self): - m = macaroon.Macaroon(b'rootkey', - b'some id', - 'here', - bakery.LATEST_BAKERY_VERSION) + m = macaroonbakery.Macaroon( + b'rootkey', + b'some id', + 'here', + macaroonbakery.LATEST_BAKERY_VERSION) self.assertIsNotNone(m) - self.assertEquals(m._macaroon.identifier, 'some id') + self.assertEquals(m._macaroon.identifier, b'some id') self.assertEquals(m._macaroon.location, 'here') - self.assertEquals(m.version, macaroon.macaroon_version( - bakery.LATEST_BAKERY_VERSION)) + self.assertEquals(m.version, macaroonbakery.LATEST_BAKERY_VERSION) def test_add_first_party_caveat(self): - m = macaroon.Macaroon('rootkey', - 'some id', - 'here', - bakery.LATEST_BAKERY_VERSION) - m = m.add_caveat(checkers.Caveat('test_condition')) + m = macaroonbakery.Macaroon('rootkey', 'some id', 'here', + macaroonbakery.LATEST_BAKERY_VERSION) + m.add_caveat(checkers.Caveat('test_condition')) caveats = m.first_party_caveats() self.assertEquals(len(caveats), 1) - self.assertEquals(caveats[0].caveat_id, 'test_condition') + self.assertEquals(caveats[0].caveat_id, b'test_condition') def test_add_third_party_caveat(self): - m = macaroon.Macaroon('rootkey', - 'some id', - 'here', - bakery.LATEST_BAKERY_VERSION) - loc = macaroon.ThirdPartyLocator() - fp_key = nacl.public.PrivateKey.generate() - tp_key = nacl.public.PrivateKey.generate() - - loc.add_info('test_location', - bakery.ThirdPartyInfo( - bakery.BAKERY_V1, - tp_key.public_key)) - m = m.add_caveat(checkers.Caveat(condition='test_condition', - location='test_location'), - fp_key, loc) - - tp_cav = m.third_party_caveats() - self.assertEquals(len(tp_cav), 1) - self.assertEquals(tp_cav[0].location, 'test_location') - cav = codec.decode_caveat(tp_key, six.b(tp_cav[0].caveat_id)) - self.assertEquals(cav, macaroon.ThirdPartyCaveatInfo( - condition='test_condition', - first_party_public_key=fp_key.public_key, - third_party_key_pair=tp_key, - root_key='random', - caveat=six.b(tp_cav[0].caveat_id), - version=bakery.BAKERY_V1, - ns=macaroon.legacy_namespace() - )) + locator = macaroonbakery.ThirdPartyStore() + bs = common.new_bakery('bs-loc', locator) + + lbv = six.int2byte(macaroonbakery.LATEST_BAKERY_VERSION) + tests = [ + ('no existing id', b'', [], lbv + six.int2byte(0)), + ('several existing ids', b'', [ + lbv + six.int2byte(0), + lbv + six.int2byte(1), + lbv + six.int2byte(2) + ], lbv + six.int2byte(3)), + ('with base id', lbv + six.int2byte(0), [lbv + six.int2byte(0)], + lbv + six.int2byte(0) + six.int2byte(0)), + ('with base id and existing id', lbv + six.int2byte(0), [ + lbv + six.int2byte(0) + six.int2byte(0) + ], lbv + six.int2byte(0) + six.int2byte(1)) + ] + + for test in tests: + print('test ', test[0]) + m = macaroonbakery.Macaroon( + root_key=b'root key', id=b'id', + location='location', + version=macaroonbakery.LATEST_BAKERY_VERSION) + for id in test[2]: + m.macaroon.add_third_party_caveat(key=None, key_id=id, + location='') + m._caveat_id_prefix = test[1] + m.add_caveat(checkers.Caveat(location='bs-loc', + condition='something'), + bs.oven.key, locator) + self.assertEqual(m.macaroon.caveats[len(test[2])].caveat_id, + test[3]) + + def test_marshal_json_latest_version(self): + locator = macaroonbakery.ThirdPartyStore() + bs = common.new_bakery('bs-loc', locator) + ns = checkers.Namespace({ + 'testns': 'x', + 'otherns': 'y', + }) + m = macaroonbakery.Macaroon( + root_key=b'root key', id=b'id', + location='location', + version=macaroonbakery.LATEST_BAKERY_VERSION, + namespace=ns) + m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'), + bs.oven.key, locator) + data = m.serialize_json() + m1 = macaroonbakery.Macaroon.deserialize_json(data) + # Just check the signature and version - we're not interested in fully + # checking the macaroon marshaling here. + self.assertEqual(m1.macaroon.signature, m.macaroon.signature) + self.assertEqual(m1.macaroon.version, m.macaroon.version) + self.assertEqual(len(m1.macaroon.caveats), 1) + self.assertEqual(m1.namespace, m.namespace) + self.assertEqual(m1._caveat_data, m._caveat_data) + + # test with the encoder, decoder + data = json.dumps(m, cls=macaroonbakery.MacaroonJSONEncoder) + m1 = json.loads(data, cls=macaroonbakery.MacaroonJSONDecoder) + self.assertEqual(m1.macaroon.signature, m.macaroon.signature) + self.assertEqual(m1.macaroon.version, m.macaroon.version) + self.assertEqual(len(m1.macaroon.caveats), 1) + self.assertEqual(m1.namespace, m.namespace) + self.assertEqual(m1._caveat_data, m._caveat_data) + + def test_json_version1(self): + self._test_json_with_version(macaroonbakery.BAKERY_V1) + + def test_json_version2(self): + self._test_json_with_version(macaroonbakery.BAKERY_V2) + + def _test_json_with_version(self, version): + locator = macaroonbakery.ThirdPartyStore() + bs = common.new_bakery('bs-loc', locator) + + ns = checkers.Namespace({ + 'testns': 'x', + }) + + m = macaroonbakery.Macaroon( + root_key=b'root key', id=b'id', + location='location', version=version, + namespace=ns) + m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'), + bs.oven.key, locator) + + # Sanity check that no external caveat data has been added. + self.assertEqual(len(m._caveat_data), 0) + + data = json.dumps(m, cls=macaroonbakery.MacaroonJSONEncoder) + m1 = json.loads(data, cls=macaroonbakery.MacaroonJSONDecoder) + + # Just check the signature and version - we're not interested in fully + # checking the macaroon marshaling here. + self.assertEqual(m1.macaroon.signature, m.macaroon.signature) + self.assertEqual(m1.macaroon.version, + macaroonbakery.macaroon_version(version)) + self.assertEqual(len(m1.macaroon.caveats), 1) + + # Namespace information has been thrown away. + self.assertEqual(m1.namespace, macaroonbakery.legacy_namespace()) + + self.assertEqual(len(m1._caveat_data), 0) + + def test_json_unknown_version(self): + m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2) + with self.assertRaises(ValueError) as exc: + json.loads(json.dumps({ + 'm': m.serialize(serializer=serializers.JsonSerializer()), + 'v': macaroonbakery.LATEST_BAKERY_VERSION + 1 + }), cls=macaroonbakery.MacaroonJSONDecoder) + self.assertEqual('unknow bakery version 4', exc.exception.args[0]) + + def test_json_inconsistent_version(self): + m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V1) + with self.assertRaises(ValueError) as exc: + json.loads(json.dumps({ + 'm': json.loads(m.serialize( + serializer=serializers.JsonSerializer())), + 'v': macaroonbakery.LATEST_BAKERY_VERSION + }), cls=macaroonbakery.MacaroonJSONDecoder) + self.assertEqual('underlying macaroon has inconsistent version; ' + 'got 1 want 2', exc.exception.args[0]) + + def test_clone(self): + locator = macaroonbakery.ThirdPartyStore() + bs = common.new_bakery("bs-loc", locator) + ns = checkers.Namespace({ + "testns": "x", + }) + m = macaroonbakery.Macaroon( + root_key=b'root key', id=b'id', + location='location', + version=macaroonbakery.LATEST_BAKERY_VERSION, + namespace=ns) + m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'), + bs.oven.key, locator) + m1 = m.copy() + self.assertEqual(len(m.macaroon.caveats), 1) + self.assertEqual(len(m1.macaroon.caveats), 1) + self.assertEqual(m._caveat_data, m1._caveat_data) + m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'), + bs.oven.key, locator) + self.assertEqual(len(m.macaroon.caveats), 2) + self.assertEqual(len(m1.macaroon.caveats), 1) + self.assertNotEqual(m._caveat_data, m1._caveat_data) + + def test_json_deserialize_from_go(self): + ns = checkers.Namespace() + ns.register("someuri", "x") + m = macaroonbakery.Macaroon( + root_key=b'rootkey', id=b'some id', location='here', + version=macaroonbakery.LATEST_BAKERY_VERSION, namespace=ns) + m.add_caveat(checkers.Caveat(condition='something', + namespace='someuri')) + data = '{"m":{"c":[{"i":"x:something"}],"l":"here","i":"some id",' \ + '"s64":"c8edRIupArSrY-WZfa62pgZFD8VjDgqho9U2PlADe-E"},"v":3,' \ + '"ns":"someuri:x"}' + m_go = macaroonbakery.Macaroon.deserialize_json(data) + + self.assertEqual(m.macaroon.signature_bytes, + m_go.macaroon.signature_bytes) + self.assertEqual(m.macaroon.version, m_go.macaroon.version) + self.assertEqual(len(m_go.macaroon.caveats), 1) + self.assertEqual(m.namespace, m_go.namespace) diff --git a/macaroonbakery/tests/test_namespace.py b/macaroonbakery/tests/test_namespace.py index 24eda29..2f04bb3 100644 --- a/macaroonbakery/tests/test_namespace.py +++ b/macaroonbakery/tests/test_namespace.py @@ -1,9 +1,8 @@ # Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. - from unittest import TestCase -from macaroonbakery import namespace +import macaroonbakery.checkers as checkers class TestNamespace(TestCase): @@ -23,17 +22,17 @@ class TestNamespace(TestCase): }, b'a:one a1:two') ] for test in tests: - ns = namespace.Namespace(test[1]) - data = ns.serialize() + ns = checkers.Namespace(test[1]) + data = ns.serialize_text() self.assertEquals(data, test[2]) self.assertEquals(str(ns), test[2].decode('utf-8')) # Check that it can be deserialize to the same thing: - ns1 = namespace.deserialize_namespace(data) + ns1 = checkers.deserialize_namespace(data) self.assertEquals(ns1, ns) def test_register(self): - ns = namespace.Namespace(None) + ns = checkers.Namespace(None) ns.register('testns', 't') prefix = ns.resolve('testns') self.assertEquals(prefix, 't') @@ -48,11 +47,11 @@ class TestNamespace(TestCase): self.assertEquals(prefix, 'o') def test_register_bad_uri(self): - ns = namespace.Namespace(None) + ns = checkers.Namespace(None) with self.assertRaises(KeyError): ns.register('', 'x') def test_register_bad_prefix(self): - ns = namespace.Namespace(None) + ns = checkers.Namespace(None) with self.assertRaises(ValueError): ns.register('std', 'x:1') diff --git a/macaroonbakery/tests/test_oven.py b/macaroonbakery/tests/test_oven.py new file mode 100644 index 0000000..2976e94 --- /dev/null +++ b/macaroonbakery/tests/test_oven.py @@ -0,0 +1,125 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +from unittest import TestCase + +import copy +from datetime import datetime, timedelta + +import macaroonbakery + +EPOCH = datetime(1900, 11, 17, 19, 00, 13, 0, None) +AGES = EPOCH + timedelta(days=10) + + +class TestOven(TestCase): + def test_canonical_ops(self): + canonical_ops_tests = ( + ('empty array', [], []), + ('one element', [macaroonbakery.Op('a', 'a')], + [macaroonbakery.Op('a', 'a')]), + ('all in order', + [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'), + macaroonbakery.Op('c', 'c')], + [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'), + macaroonbakery.Op('c', 'c')]), + ('out of order', + [macaroonbakery.Op('c', 'c'), macaroonbakery.Op('a', 'b'), + macaroonbakery.Op('a', 'a')], + [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'), + macaroonbakery.Op('c', 'c')]), + ('with duplicates', + [macaroonbakery.Op('c', 'c'), macaroonbakery.Op('a', 'b'), + macaroonbakery.Op('a', 'a'), macaroonbakery.Op('c', 'a'), + macaroonbakery.Op('c', 'b'), macaroonbakery.Op('c', 'c'), + macaroonbakery.Op('a', 'a')], + [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'), + macaroonbakery.Op('c', 'a'), macaroonbakery.Op('c', 'b'), + macaroonbakery.Op('c', 'c')]), + ('make sure we\'ve got the fields right', + [macaroonbakery.Op(entity='read', action='two'), + macaroonbakery.Op(entity='read', action='one'), + macaroonbakery.Op(entity='write', action='one')], + [macaroonbakery.Op(entity='read', action='one'), + macaroonbakery.Op(entity='read', action='two'), + macaroonbakery.Op(entity='write', action='one')]) + ) + for about, ops, expected in canonical_ops_tests: + new_ops = copy.copy(ops) + canonical_ops = macaroonbakery.canonical_ops(new_ops) + self.assertEquals(canonical_ops, expected) + # Verify that the original array isn't changed. + self.assertEquals(new_ops, ops) + + def test_multiple_ops(self): + test_oven = macaroonbakery.Oven( + ops_store=macaroonbakery.MemoryOpsStore()) + ops = [macaroonbakery.Op('one', 'read'), + macaroonbakery.Op('one', 'write'), + macaroonbakery.Op('two', 'read')] + m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, + None, ops) + got_ops, conds = test_oven.macaroon_ops([m.macaroon]) + self.assertEquals(len(conds), 1) # time-before caveat. + self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops) + + def test_multiple_ops_in_id(self): + test_oven = macaroonbakery.Oven() + ops = [macaroonbakery.Op('one', 'read'), + macaroonbakery.Op('one', 'write'), + macaroonbakery.Op('two', 'read')] + m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, + None, ops) + got_ops, conds = test_oven.macaroon_ops([m.macaroon]) + self.assertEquals(len(conds), 1) # time-before caveat. + self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops) + + def test_multiple_ops_in_id_with_version1(self): + test_oven = macaroonbakery.Oven() + ops = [macaroonbakery.Op('one', 'read'), + macaroonbakery.Op('one', 'write'), + macaroonbakery.Op('two', 'read')] + m = test_oven.macaroon(macaroonbakery.BAKERY_V1, AGES, None, ops) + got_ops, conds = test_oven.macaroon_ops([m.macaroon]) + self.assertEquals(len(conds), 1) # time-before caveat. + self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops) + + def test_huge_number_of_ops_gives_small_macaroon(self): + test_oven = macaroonbakery.Oven( + ops_store=macaroonbakery.MemoryOpsStore()) + ops = [] + for i in range(30000): + ops.append(macaroonbakery.Op(entity='entity{}'.format(i), + action='action{}'.format(i))) + + m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, + None, ops) + got_ops, conds = test_oven.macaroon_ops([m.macaroon]) + self.assertEquals(len(conds), 1) # time-before caveat. + self.assertEquals(macaroonbakery.canonical_ops(got_ops), + macaroonbakery.canonical_ops(ops)) + + data = m.serialize_json() + self.assertLess(len(data), 300) + + def test_ops_stored_only_once(self): + st = macaroonbakery.MemoryOpsStore() + test_oven = macaroonbakery.Oven(ops_store=st) + + ops = [macaroonbakery.Op('one', 'read'), + macaroonbakery.Op('one', 'write'), + macaroonbakery.Op('two', 'read')] + + m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, + None, ops) + got_ops, conds = test_oven.macaroon_ops([m.macaroon]) + self.assertEquals(macaroonbakery.canonical_ops(got_ops), + macaroonbakery.canonical_ops(ops)) + + # Make another macaroon containing the same ops in a different order. + ops = [macaroonbakery.Op('one', 'write'), + macaroonbakery.Op('one', 'read'), + macaroonbakery.Op('one', 'read'), + macaroonbakery.Op('two', 'read')] + test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, None, + ops) + self.assertEquals(len(st._store), 1) diff --git a/macaroonbakery/tests/test_store.py b/macaroonbakery/tests/test_store.py new file mode 100644 index 0000000..7bcc4c2 --- /dev/null +++ b/macaroonbakery/tests/test_store.py @@ -0,0 +1,21 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +from unittest import TestCase + +import macaroonbakery + + +class TestOven(TestCase): + def test_mem_store(self): + st = macaroonbakery.MemoryKeyStore() + + key, id = st.root_key() + self.assertEqual(len(key), 24) + self.assertEqual(id.decode('utf-8'), '0') + + key1, id1 = st.root_key() + self.assertEqual(key1, key) + self.assertEqual(id1, id) + + key2 = st.get(id) + self.assertEqual(key2, key) |