diff options
Diffstat (limited to 'macaroonbakery/tests/test_checker.py')
-rw-r--r-- | macaroonbakery/tests/test_checker.py | 963 |
1 files changed, 963 insertions, 0 deletions
diff --git a/macaroonbakery/tests/test_checker.py b/macaroonbakery/tests/test_checker.py new file mode 100644 index 0000000..06bf008 --- /dev/null +++ b/macaroonbakery/tests/test_checker.py @@ -0,0 +1,963 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +import base64 +from collections import namedtuple +import json +from unittest import TestCase +from datetime import timedelta + +from pymacaroons.verifier import Verifier, FirstPartyCaveatVerifierDelegate +import pymacaroons + +import macaroonbakery +import macaroonbakery.checkers as checkers +from macaroonbakery.tests.common import test_context, epoch, test_checker + + +class TestChecker(TestCase): + def setUp(self): + self._discharges = [] + + def test_authorize_with_open_access_and_no_macaroons(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + {macaroonbakery.Op(entity='something', action='read'): + {macaroonbakery.EVERYONE}}) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + auth_info = client.do(test_context, ts, + [macaroonbakery.Op(entity='something', + action='read')]) + self.assertEqual(len(self._discharges), 0) + self.assertIsNotNone(auth_info) + self.assertIsNone(auth_info.identity) + self.assertEqual(len(auth_info.macaroons), 0) + + def test_authorization_denied(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = macaroonbakery.ClosedAuthorizer() + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + with self.assertRaises(macaroonbakery.PermissionDenied): + client.do(ctx, ts, [macaroonbakery.Op(entity='something', + action='read')]) + + def test_authorize_with_authentication_required(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + {macaroonbakery.Op(entity='something', action='read'): {'bob'}}) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + auth_info = client.do(ctx, ts, [macaroonbakery.Op(entity='something', + action='read')]) + self.assertEqual(self._discharges, + [_DischargeRecord(location='ids', user='bob')]) + self.assertIsNotNone(auth_info) + self.assertEqual(auth_info.identity.id(), 'bob') + self.assertEqual(len(auth_info.macaroons), 1) + + def test_authorize_multiple_ops(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='something', action='read'): {'bob'}, + macaroonbakery.Op(entity='otherthing', action='read'): {'bob'} + } + ) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + client.do(ctx, ts, [ + macaroonbakery.Op(entity='something', action='read'), + macaroonbakery.Op(entity='otherthing', action='read') + ]) + self.assertEqual(self._discharges, + [_DischargeRecord(location='ids', user='bob')]) + + def test_capability(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + {macaroonbakery.Op(entity='something', action='read'): {'bob'}}) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m = client.discharged_capability( + ctx, ts, [macaroonbakery.Op(entity='something', action='read')]) + # Check that we can exercise the capability directly on the service + # with no discharging required. + auth_info = ts.do(test_context, [m], + [macaroonbakery.Op(entity='something', + action='read')]) + self.assertIsNotNone(auth_info) + self.assertIsNone(auth_info.identity) + self.assertEqual(len(auth_info.macaroons), 1) + self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, + m[0].identifier_bytes) + + def test_capability_multiple_entities(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'bob'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + macaroonbakery.Op(entity='e3', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m = client.discharged_capability(ctx, ts, [ + macaroonbakery.Op(entity='e1', + action='read'), + macaroonbakery.Op(entity='e2', + action='read'), + macaroonbakery.Op(entity='e3', + action='read')]) + self.assertEqual(self._discharges, + [_DischargeRecord(location='ids', user='bob')]) + + # Check that we can exercise the capability directly on the service + # with no discharging required. + ts.do(test_context, [m], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read'), + macaroonbakery.Op(entity='e3', action='read')]) + + # Check that we can exercise the capability to act on a subset of + # the operations. + ts.do(test_context, [m], [ + macaroonbakery.Op(entity='e2', action='read'), + macaroonbakery.Op(entity='e3', action='read')] + ) + ts.do(test_context, [m], + [macaroonbakery.Op(entity='e3', action='read')]) + + def test_multiple_capabilities(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Acquire two capabilities as different users and check + # that we can combine them together to do both operations + # at once. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m1 = _Client(locator).discharged_capability(ctx, ts, + [macaroonbakery.Op( + entity='e1', + action='read')]) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m2 = _Client(locator).discharged_capability(ctx, ts, + [macaroonbakery.Op( + entity='e2', + action='read')]) + self.assertEqual(self._discharges, + [ + _DischargeRecord(location='ids', user='alice'), + _DischargeRecord(location='ids', user='bob'), + ]) + auth_info = ts.do(test_context, [m1, m2], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + self.assertIsNotNone(auth_info) + self.assertIsNone(auth_info.identity) + self.assertEqual(len(auth_info.macaroons), 2) + self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, + m1[0].identifier_bytes) + self.assertEqual(auth_info.macaroons[1][0].identifier_bytes, + m2[0].identifier_bytes) + + def test_combine_capabilities(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + macaroonbakery.Op(entity='e3', action='read'): {'bob', + 'alice'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Acquire two capabilities as different users and check + # that we can combine them together into a single capability + # capable of both operations. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m1 = _Client(locator).discharged_capability( + ctx, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e3', action='read')]) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m2 = _Client(locator).discharged_capability( + ctx, ts, [macaroonbakery.Op(entity='e2', action='read')]) + + m = ts.capability(test_context, [m1, m2], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read'), + macaroonbakery.Op(entity='e3', action='read')]) + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read'), + macaroonbakery.Op(entity='e3', action='read')]) + + def test_partially_authorized_request(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Acquire a capability for e1 but rely on authentication to + # authorize e2. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m = _Client(locator).discharged_capability(ctx, ts, + [macaroonbakery.Op( + entity='e1', + action='read')]) + client = _Client(locator) + client.add_macaroon(ts, 'authz', m) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + client.discharged_capability( + ctx, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + + def test_auth_with_third_party_caveats(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + + # We make an authorizer that requires a third party discharge + # when authorizing. + def authorize_with_tp_discharge(ctx, id, op): + if (id is not None and id.id() == 'bob' and + op == macaroonbakery.Op(entity='something', + action='read')): + return True, [checkers.Caveat(condition='question', + location='other third party')] + return False, None + + auth = macaroonbakery.AuthorizerFunc(authorize_with_tp_discharge) + ts = _Service('myservice', auth, ids, locator) + + class _LocalDischargeChecker(macaroonbakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(_, ctx, info): + if info.condition != 'question': + raise ValueError('third party condition not recognized') + self._discharges.append(_DischargeRecord( + location='other third party', + user=ctx.get(_DISCHARGE_USER_KEY) + )) + return [] + + locator['other third party'] = _Discharger( + key=macaroonbakery.generate_key(), + checker=_LocalDischargeChecker(), + locator=locator, + ) + client = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + client.do(ctx, ts, [macaroonbakery.Op(entity='something', + action='read')]) + self.assertEqual(self._discharges, [ + _DischargeRecord(location='ids', user='bob'), + _DischargeRecord(location='other third party', + user='bob') + ]) + + def test_capability_combines_first_party_caveats(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'} + } + ) + ts = _Service('myservice', auth, ids, locator) + + # Acquire two capabilities as different users, add some first party + # caveats that we can combine them together into a single capability + # capable of both operations. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m1 = _Client(locator).capability( + ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + m1.macaroon.add_first_party_caveat('true 1') + m1.macaroon.add_first_party_caveat('true 2') + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m2 = _Client(locator).capability( + ctx, ts, [macaroonbakery.Op(entity='e2', action='read')]) + m2.macaroon.add_first_party_caveat('true 3') + m2.macaroon.add_first_party_caveat('true 4') + + client = _Client(locator) + client.add_macaroon(ts, 'authz1', [m1.macaroon]) + client.add_macaroon(ts, 'authz2', [m2.macaroon]) + + m = client.capability(test_context, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [ + 'true 1', + 'true 2', + 'true 3', + 'true 4', + ]) + + def test_first_party_caveat_squashing(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'alice'}, + }) + ts = _Service('myservice', auth, ids, locator) + tests = [ + ('duplicates removed', [ + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.Caveat(condition='true 2', namespace='testns'), + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.Caveat(condition='true 3', namespace='testns'), + ], [ + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.Caveat(condition='true 2', namespace='testns'), + checkers.Caveat(condition='true 3', namespace='testns'), + ]), ('earliest time before', [ + checkers.time_before_caveat(epoch + timedelta(days=1)), + checkers.Caveat(condition='true 1', namespace='testns'), + checkers.time_before_caveat( + epoch + timedelta(days=0, hours=1)), + checkers.time_before_caveat(epoch + timedelta( + days=0, hours=0, minutes=5)), + ], [ + checkers.time_before_caveat(epoch + timedelta( + days=0, hours=0, minutes=5)), + checkers.Caveat(condition='true 1', namespace='testns'), + ]), ('operations and declared caveats removed', [ + checkers.deny_caveat(['foo']), + checkers.allow_caveat(['read', 'write']), + checkers.declared_caveat('username', 'bob'), + checkers.Caveat(condition='true 1', namespace='testns'), + ], [ + checkers.Caveat(condition='true 1', namespace='testns'), + ]) + ] + for test in tests: + print(test[0]) + + # Make a first macaroon with all the required first party caveats. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m1 = _Client(locator).capability( + ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + m1.add_caveats(test[1], None, None) + + # Make a second macaroon that's not used to check that it's + # caveats are not added. + m2 = _Client(locator).capability( + ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + m2.add_caveat(checkers.Caveat( + condition='true notused', namespace='testns'), None, None) + client = _Client(locator) + client.add_macaroon(ts, 'authz1', [m1.macaroon]) + client.add_macaroon(ts, 'authz2', [m2.macaroon]) + + m3 = client.capability( + test_context, ts, [macaroonbakery.Op(entity='e1', + action='read')]) + self.assertEqual( + _macaroon_conditions(m3.macaroon.caveats, False), + _resolve_caveats(m3.namespace, test[2])) + + def test_login_only(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = macaroonbakery.ClosedAuthorizer() + ts = _Service('myservice', auth, ids, locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP]) + self.assertIsNotNone(auth_info) + self.assertEqual(auth_info.identity.id(), 'bob') + + def test_allow_any(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'alice'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Acquire a capability for e1 but rely on authentication to + # authorize e2. + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + m = _Client(locator).discharged_capability(ctx, ts, + [macaroonbakery.Op( + entity='e1', + action='read')]) + + client = _Client(locator) + client.add_macaroon(ts, 'authz', m) + + self._discharges = [] + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + with self.assertRaises(_DischargeRequiredError): + client.do_any( + ctx, ts, [ + macaroonbakery.LOGIN_OP, + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e1', action='read') + ] + ) + self.assertEqual(len(self._discharges), 0) + + # Log in as bob. + _, err = client.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + + # All the previous actions should now be allowed. + auth_info, allowed = client.do_any( + ctx, ts, [ + macaroonbakery.LOGIN_OP, + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e1', action='read') + ] + ) + self.assertEqual(auth_info.identity.id(), 'bob') + self.assertEqual(len(auth_info.macaroons), 2) + self.assertEqual(allowed, [True, True, True]) + + def test_auth_with_identity_from_context(self): + locator = _DischargerLocator() + ids = _BasicAuthIdService() + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'sherlock'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + + # Check that we can perform the ops with basic auth in the + # context. + ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes') + auth_info = _Client(locator).do( + ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + self.assertEqual(auth_info.identity.id(), 'sherlock') + self.assertEqual(len(auth_info.macaroons), 0) + + def test_auth_login_op_with_identity_from_context(self): + locator = _DischargerLocator() + ids = _BasicAuthIdService() + ts = _Service('myservice', macaroonbakery.ClosedAuthorizer(), + ids, locator) + + # Check that we can use LoginOp + # when auth isn't granted through macaroons. + ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes') + auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'sherlock') + self.assertEqual(len(auth_info.macaroons), 0) + + def test_operation_allow_caveat(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'bob'}, + macaroonbakery.Op(entity='e1', action='write'): {'bob'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m = client.capability( + ctx, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e1', action='write'), + macaroonbakery.Op(entity='e2', action='read')]) + + # Sanity check that we can do a write. + ts.do(test_context, [[m.macaroon]], + [macaroonbakery.Op(entity='e1', action='write')]) + + m.add_caveat(checkers.allow_caveat(['read']), None, None) + + # A read operation should work. + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + + # A write operation should fail + # even though the original macaroon allowed it. + with self.assertRaises(_DischargeRequiredError): + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='write')]) + + def test_operation_deny_caveat(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = _OpAuthorizer( + { + macaroonbakery.Op(entity='e1', action='read'): {'bob'}, + macaroonbakery.Op(entity='e1', action='write'): {'bob'}, + macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + }) + ts = _Service('myservice', auth, ids, locator) + client = _Client(locator) + + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + m = client.capability( + ctx, ts, [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e1', action='write'), + macaroonbakery.Op(entity='e2', action='read')]) + + # Sanity check that we can do a write. + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='write')]) + + m.add_caveat(checkers.deny_caveat(['write']), None, None) + + # A read operation should work. + ts.do( + test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='read'), + macaroonbakery.Op(entity='e2', action='read')]) + + # A write operation should fail + # even though the original macaroon allowed it. + with self.assertRaises(_DischargeRequiredError): + ts.do(test_context, [[m.macaroon]], [ + macaroonbakery.Op(entity='e1', action='write')]) + + def test_duplicate_login_macaroons(self): + locator = _DischargerLocator() + ids = _IdService('ids', locator, self) + auth = macaroonbakery.ClosedAuthorizer() + ts = _Service('myservice', auth, ids, locator) + + # Acquire a login macaroon for bob. + client1 = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') + auth_info = client1.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'bob') + + # Acquire a login macaroon for alice. + client2 = _Client(locator) + ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') + auth_info = client2.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'alice') + + # Combine the two login macaroons into one client. + client3 = _Client(locator) + client3.add_macaroon(ts, '1.bob', + client1._macaroons[ts.name()]['authn']) + client3.add_macaroon(ts, '2.alice', + client2._macaroons[ts.name()]['authn']) + + # We should authenticate as bob (because macaroons are presented + # ordered by "cookie" name) + auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'bob') + self.assertEqual(len(auth_info.macaroons), 1) + + # Try them the other way around and we should authenticate as alice. + client3 = _Client(locator) + client3.add_macaroon(ts, '1.alice', + client2._macaroons[ts.name()]['authn']) + client3.add_macaroon(ts, '2.bob', + client1._macaroons[ts.name()]['authn']) + + auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP]) + self.assertEqual(auth_info.identity.id(), 'alice') + self.assertEqual(len(auth_info.macaroons), 1) + + def test_macaroon_ops_fatal_error(self): + # When we get a non-VerificationError error from the + # opstore, we don't do any more verification. + checker = macaroonbakery.Checker( + macaroon_opstore=_MacaroonStoreWithError()) + m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2) + with self.assertRaises(ValueError): + checker.auth([m]).allow(test_context, [macaroonbakery.LOGIN_OP]) + + +class _DischargerLocator(object): + def __init__(self, dischargers=None): + if dischargers is None: + dischargers = {} + self._dischargers = dischargers + + def third_party_info(self, loc): + d = self._dischargers.get(loc) + if d is None: + return None + return macaroonbakery.ThirdPartyInfo( + public_key=d._key.public_key, + version=macaroonbakery.LATEST_BAKERY_VERSION, + ) + + def __setitem__(self, key, item): + self._dischargers[key] = item + + def __getitem__(self, key): + return self._dischargers[key] + + def get(self, key): + return self._dischargers.get(key) + + +class _IdService(macaroonbakery.IdentityClient, + macaroonbakery.ThirdPartyCaveatChecker): + def __init__(self, location, locator, test_class): + self._location = location + self._test = test_class + key = macaroonbakery.generate_key() + self._discharger = _Discharger(key=key, checker=self, locator=locator) + locator[location] = self._discharger + + def check_third_party_caveat(self, ctx, info): + if info.condition != 'is-authenticated-user': + raise macaroonbakery.CaveatNotRecognizedError( + 'third party condition not ' + 'recognized') + + username = ctx.get(_DISCHARGE_USER_KEY, '') + if username == '': + return macaroonbakery.ThirdPartyCaveatCheckFailed( + 'no current user') + self._test._discharges.append( + _DischargeRecord(location=self._location, user=username)) + return [checkers.declared_caveat('username', username)] + + def identity_from_context(self, ctx): + return None, [checkers.Caveat(location=self._location, + condition='is-authenticated-user')] + + def declared_identity(self, ctx, declared): + user = declared.get('username') + if user is None: + raise macaroonbakery.IdentityError('no username declared') + return macaroonbakery.SimpleIdentity(user) + + +_DISCHARGE_USER_KEY = checkers.ContextKey('user-key') + +_DischargeRecord = namedtuple('_DISCHARGE_RECORD', ['location', 'user']) + + +class _Discharger(object): + ''' utility class that has a discharge function with the same signature of + get_discharge for discharge_all. + ''' + + def __init__(self, key, locator, checker): + self._key = key + self._locator = locator + self._checker = checker + + def discharge(self, ctx, cav, payload): + return macaroonbakery.discharge(ctx, key=self._key, id=cav.caveat_id, + caveat=payload, + checker=self._checker, + locator=self._locator) + + +class _OpAuthorizer(macaroonbakery.Authorizer): + '''Implements bakery.Authorizer by looking the operation + up in the given map. If the username is in the associated list + or the list contains "everyone", authorization is granted. + ''' + + def __init__(self, auth=None): + if auth is None: + auth = {} + self._auth = auth + + def authorize(self, ctx, id, ops): + return macaroonbakery.ACLAuthorizer( + allow_public=True, + get_acl=lambda ctx, op: self._auth.get(op, [])).authorize( + ctx, id, ops) + + +class _MacaroonStore(object): + ''' Stores root keys in memory and puts all operations in the macaroon id. + ''' + + def __init__(self, key, locator): + self._root_key_store = macaroonbakery.MemoryKeyStore() + self._key = key + self._locator = locator + + def new_macaroon(self, caveats, namespace, ops): + root_key, id = self._root_key_store.root_key() + m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops} + data = json.dumps(m_id) + m = macaroonbakery.Macaroon( + root_key=root_key, id=data, location='', + version=macaroonbakery.LATEST_BAKERY_VERSION, + namespace=namespace) + m.add_caveats(caveats, self._key, self._locator) + return m + + def macaroon_ops(self, ms): + if len(ms) == 0: + raise ValueError('no macaroons provided') + + m_id = json.loads(ms[0].identifier_bytes.decode('utf-8')) + root_key = self._root_key_store.get( + base64.urlsafe_b64decode(m_id['id'].encode('utf-8'))) + + v = Verifier() + + class NoValidationOnFirstPartyCaveat(FirstPartyCaveatVerifierDelegate): + def verify_first_party_caveat(self, verifier, caveat, signature): + return True + + v.first_party_caveat_verifier_delegate = \ + NoValidationOnFirstPartyCaveat() + ok = v.verify(macaroon=ms[0], key=root_key, + discharge_macaroons=ms[1:]) + if not ok: + raise macaroonbakery.VerificationError('invalid signature') + conditions = [] + for m in ms: + cavs = m.first_party_caveats() + for cav in cavs: + conditions.append(cav.caveat_id_bytes.decode('utf-8')) + ops = [] + for op in m_id['ops']: + ops.append(macaroonbakery.Op(entity=op[0], action=op[1])) + return ops, conditions + + +class _Service(object): + '''Represents a service that requires authorization. + + Clients can make requests to the service to perform operations + and may receive a macaroon to discharge if the authorization + process requires it. + ''' + + def __init__(self, name, auth, idm, locator): + self._name = name + self._store = _MacaroonStore(macaroonbakery.generate_key(), locator) + self._checker = macaroonbakery.Checker( + checker=test_checker(), + authorizer=auth, + identity_client=idm, + macaroon_opstore=self._store) + + def name(self): + return self._name + + def do(self, ctx, ms, ops): + try: + authInfo = self._checker.auth(ms).allow(ctx, ops) + except macaroonbakery.DischargeRequiredError as exc: + self._discharge_required_error(exc) + return authInfo + + def do_any(self, ctx, ms, ops): + # makes a request to the service to perform any of the given + # operations. It reports which operations have succeeded. + try: + authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops) + return authInfo, allowed + except macaroonbakery.DischargeRequiredError as exc: + self._discharge_required_error(exc) + + def capability(self, ctx, ms, ops): + try: + conds = self._checker.auth(ms).allow_capability(ctx, ops) + except macaroonbakery.DischargeRequiredError as exc: + self._discharge_required_error(exc) + + m = self._store.new_macaroon(None, self._checker.namespace(), ops) + for cond in conds: + m.macaroon.add_first_party_caveat(cond) + return m + + def _discharge_required_error(self, err): + m = self._store.new_macaroon(err.cavs(), self._checker.namespace(), + err.ops()) + name = 'authz' + if len(err.ops()) == 1 and err.ops()[0] == macaroonbakery.LOGIN_OP: + name = 'authn' + raise _DischargeRequiredError(name=name, m=m) + + +class _DischargeRequiredError(Exception): + def __init__(self, name, m): + Exception.__init__(self, 'discharge required') + self._name = name + self._m = m + + def m(self): + return self._m + + def name(self): + return self._name + + +class _Client(object): + max_retries = 3 + + def __init__(self, dischargers): + self._key = macaroonbakery.generate_key() + self._macaroons = {} + self._dischargers = dischargers + + def do(self, ctx, svc, ops): + class _AuthInfo: + authInfo = None + + def svc_do(ms): + _AuthInfo.authInfo = svc.do(ctx, ms, ops) + + self._do_func(ctx, svc, svc_do) + return _AuthInfo.authInfo + + def do_any(self, ctx, svc, ops): + return svc.do_any(ctx, self._request_macaroons(svc), ops) + + def capability(self, ctx, svc, ops): + # capability returns a capability macaroon for the given operations. + + class _M: + m = None + + def svc_capability(ms): + _M.m = svc.capability(ctx, ms, ops) + return + + self._do_func(ctx, svc, svc_capability) + return _M.m + + def discharged_capability(self, ctx, svc, ops): + m = self.capability(ctx, svc, ops) + return self._discharge_all(ctx, m) + + def _do_func(self, ctx, svc, f): + for i in range(0, self.max_retries): + try: + f(self._request_macaroons(svc)) + return + except _DischargeRequiredError as exc: + ms = self._discharge_all(ctx, exc.m()) + self.add_macaroon(svc, exc.name(), ms) + raise ValueError('discharge failed too many times') + + def _clear_macaroons(self, svc): + if svc is None: + self._macaroons = {} + return + if svc.name() in self._macaroons: + del self._macaroons[svc.name()] + + def add_macaroon(self, svc, name, m): + if svc.name() not in self._macaroons: + self._macaroons[svc.name()] = {} + self._macaroons[svc.name()][name] = m + + def _request_macaroons(self, svc): + mmap = self._macaroons.get(svc.name(), []) + # Put all the macaroons in the slice ordered by key + # so that we have deterministic behaviour in the tests. + names = [] + for name in mmap: + names.append(name) + names = sorted(names) + ms = [None] * len(names) + for i, name in enumerate(names): + ms[i] = mmap[name] + return ms + + def _discharge_all(self, ctx, m): + def get_discharge(ctx, cav, pay_load): + d = self._dischargers.get(cav.location) + if d is None: + raise ValueError('third party discharger ' + '{} not found'.format(cav.location)) + return d.discharge(ctx, cav, pay_load) + + return macaroonbakery.discharge_all(ctx, m, get_discharge) + + +class _BasicAuthIdService(macaroonbakery.IdentityClient): + def identity_from_context(self, ctx): + user, pwd = _basic_auth_from_context(ctx) + if user != 'sherlock' or pwd != 'holmes': + return None, None + return macaroonbakery.SimpleIdentity(user), None + + def declared_identity(self, ctx, declared): + raise macaroonbakery.IdentityError('no identity declarations in basic ' + 'auth id service') + + +_BASIC_AUTH_KEY = checkers.ContextKey('user-key') + + +class _BasicAuth(object): + def __init__(self, user, password): + self.user = user + self.password = password + + +def _context_with_basic_auth(ctx, user, password): + return ctx.with_value(_BASIC_AUTH_KEY, _BasicAuth(user, password)) + + +def _basic_auth_from_context(ctx): + auth = ctx.get(_BASIC_AUTH_KEY, _BasicAuth('', '')) + return auth.user, auth.password + + +def _macaroon_conditions(caveats, allow_third): + conds = [''] * len(caveats) + for i, cav in enumerate(caveats): + if cav.location is not None and cav.location != '': + if not allow_third: + raise ValueError('found unexpected third party caveat:' + ' {}'.format(cav.location)) + continue + conds[i] = cav.caveat_id.decode('utf-8') + return conds + + +def _resolve_caveats(ns, caveats): + conds = [''] * len(caveats) + for i, cav in enumerate(caveats): + if cav.location is not None and cav.location != '': + raise ValueError('found unexpected third party caveat') + conds[i] = ns.resolve_caveat(cav).condition + return conds + + +class _MacaroonStoreWithError(object): + def new_macaroon(self, caveats, ns, ops): + raise ValueError('some error') + + def macaroon_ops(self, ms): + raise ValueError('some error') |