diff options
Diffstat (limited to 'macaroonbakery/tests/test_checker.py')
-rw-r--r-- | macaroonbakery/tests/test_checker.py | 443 |
1 files changed, 210 insertions, 233 deletions
diff --git a/macaroonbakery/tests/test_checker.py b/macaroonbakery/tests/test_checker.py index 06bf008..643c756 100644 --- a/macaroonbakery/tests/test_checker.py +++ b/macaroonbakery/tests/test_checker.py @@ -9,7 +9,7 @@ from datetime import timedelta from pymacaroons.verifier import Verifier, FirstPartyCaveatVerifierDelegate import pymacaroons -import macaroonbakery +import macaroonbakery as bakery import macaroonbakery.checkers as checkers from macaroonbakery.tests.common import test_context, epoch, test_checker @@ -22,13 +22,13 @@ class TestChecker(TestCase): locator = _DischargerLocator() ids = _IdService('ids', locator, self) auth = _OpAuthorizer( - {macaroonbakery.Op(entity='something', action='read'): - {macaroonbakery.EVERYONE}}) + {bakery.Op(entity='something', action='read'): + {bakery.EVERYONE}}) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) - auth_info = client.do(test_context, ts, - [macaroonbakery.Op(entity='something', - action='read')]) + auth_info = client.do(test_context, ts, [ + bakery.Op(entity='something', action='read'), + ]) self.assertEqual(len(self._discharges), 0) self.assertIsNotNone(auth_info) self.assertIsNone(auth_info.identity) @@ -37,25 +37,23 @@ class TestChecker(TestCase): def test_authorization_denied(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = macaroonbakery.ClosedAuthorizer() + auth = bakery.ClosedAuthorizer() ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - with self.assertRaises(macaroonbakery.PermissionDenied): - client.do(ctx, ts, [macaroonbakery.Op(entity='something', - action='read')]) + with self.assertRaises(bakery.PermissionDenied): + client.do(ctx, ts, [bakery.Op(entity='something', action='read')]) def test_authorize_with_authentication_required(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) auth = _OpAuthorizer( - {macaroonbakery.Op(entity='something', action='read'): {'bob'}}) + {bakery.Op(entity='something', action='read'): {'bob'}}) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - auth_info = client.do(ctx, ts, [macaroonbakery.Op(entity='something', - action='read')]) + auth_info = client.do(ctx, ts, [bakery.Op(entity='something', action='read')]) self.assertEqual(self._discharges, [_DischargeRecord(location='ids', user='bob')]) self.assertIsNotNone(auth_info) @@ -67,16 +65,16 @@ class TestChecker(TestCase): ids = _IdService('ids', locator, self) auth = _OpAuthorizer( { - macaroonbakery.Op(entity='something', action='read'): {'bob'}, - macaroonbakery.Op(entity='otherthing', action='read'): {'bob'} + bakery.Op(entity='something', action='read'): {'bob'}, + bakery.Op(entity='otherthing', action='read'): {'bob'} } ) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') client.do(ctx, ts, [ - macaroonbakery.Op(entity='something', action='read'), - macaroonbakery.Op(entity='otherthing', action='read') + bakery.Op(entity='something', action='read'), + bakery.Op(entity='otherthing', action='read') ]) self.assertEqual(self._discharges, [_DischargeRecord(location='ids', user='bob')]) @@ -85,159 +83,150 @@ class TestChecker(TestCase): locator = _DischargerLocator() ids = _IdService('ids', locator, self) auth = _OpAuthorizer( - {macaroonbakery.Op(entity='something', action='read'): {'bob'}}) + {bakery.Op(entity='something', action='read'): {'bob'}}) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m = client.discharged_capability( - ctx, ts, [macaroonbakery.Op(entity='something', action='read')]) + ctx, ts, [bakery.Op(entity='something', action='read')]) # Check that we can exercise the capability directly on the service # with no discharging required. - auth_info = ts.do(test_context, [m], - [macaroonbakery.Op(entity='something', - action='read')]) + auth_info = ts.do(test_context, [m], [ + bakery.Op(entity='something', action='read'), + ]) self.assertIsNotNone(auth_info) self.assertIsNone(auth_info.identity) self.assertEqual(len(auth_info.macaroons), 1) - self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, - m[0].identifier_bytes) + self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, m[0].identifier_bytes) def test_capability_multiple_entities(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'bob'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - macaroonbakery.Op(entity='e3', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'bob'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + bakery.Op(entity='e3', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m = client.discharged_capability(ctx, ts, [ - macaroonbakery.Op(entity='e1', - action='read'), - macaroonbakery.Op(entity='e2', - action='read'), - macaroonbakery.Op(entity='e3', - action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) self.assertEqual(self._discharges, [_DischargeRecord(location='ids', user='bob')]) # Check that we can exercise the capability directly on the service # with no discharging required. ts.do(test_context, [m], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read'), - macaroonbakery.Op(entity='e3', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) # Check that we can exercise the capability to act on a subset of # the operations. ts.do(test_context, [m], [ - macaroonbakery.Op(entity='e2', action='read'), - macaroonbakery.Op(entity='e3', action='read')] - ) + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) ts.do(test_context, [m], - [macaroonbakery.Op(entity='e3', action='read')]) + [bakery.Op(entity='e3', action='read')]) def test_multiple_capabilities(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) # Acquire two capabilities as different users and check # that we can combine them together to do both operations # at once. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - m1 = _Client(locator).discharged_capability(ctx, ts, - [macaroonbakery.Op( - entity='e1', - action='read')]) + m1 = _Client(locator).discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + ]) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m2 = _Client(locator).discharged_capability(ctx, ts, - [macaroonbakery.Op( + [bakery.Op( entity='e2', action='read')]) - self.assertEqual(self._discharges, - [ - _DischargeRecord(location='ids', user='alice'), - _DischargeRecord(location='ids', user='bob'), - ]) + self.assertEqual(self._discharges, [ + _DischargeRecord(location='ids', user='alice'), + _DischargeRecord(location='ids', user='bob'), + ]) auth_info = ts.do(test_context, [m1, m2], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) self.assertIsNotNone(auth_info) self.assertIsNone(auth_info.identity) self.assertEqual(len(auth_info.macaroons), 2) - self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, - m1[0].identifier_bytes) - self.assertEqual(auth_info.macaroons[1][0].identifier_bytes, - m2[0].identifier_bytes) + self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, m1[0].identifier_bytes) + self.assertEqual(auth_info.macaroons[1][0].identifier_bytes, m2[0].identifier_bytes) def test_combine_capabilities(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - macaroonbakery.Op(entity='e3', action='read'): {'bob', - 'alice'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + bakery.Op(entity='e3', action='read'): {'bob', 'alice'}, + }) ts = _Service('myservice', auth, ids, locator) # Acquire two capabilities as different users and check # that we can combine them together into a single capability # capable of both operations. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - m1 = _Client(locator).discharged_capability( - ctx, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e3', action='read')]) + m1 = _Client(locator).discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e3', action='read'), + ]) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m2 = _Client(locator).discharged_capability( - ctx, ts, [macaroonbakery.Op(entity='e2', action='read')]) + ctx, ts, [bakery.Op(entity='e2', action='read')]) m = ts.capability(test_context, [m1, m2], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read'), - macaroonbakery.Op(entity='e3', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read'), - macaroonbakery.Op(entity='e3', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) def test_partially_authorized_request(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) # Acquire a capability for e1 but rely on authentication to # authorize e2. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - m = _Client(locator).discharged_capability(ctx, ts, - [macaroonbakery.Op( - entity='e1', - action='read')]) + m = _Client(locator).discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + ]) client = _Client(locator) client.add_macaroon(ts, 'authz', m) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - client.discharged_capability( - ctx, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + client.discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) def test_auth_with_third_party_caveats(self): locator = _DischargerLocator() @@ -247,16 +236,15 @@ class TestChecker(TestCase): # when authorizing. def authorize_with_tp_discharge(ctx, id, op): if (id is not None and id.id() == 'bob' and - op == macaroonbakery.Op(entity='something', - action='read')): + op == bakery.Op(entity='something', action='read')): return True, [checkers.Caveat(condition='question', location='other third party')] return False, None - auth = macaroonbakery.AuthorizerFunc(authorize_with_tp_discharge) + auth = bakery.AuthorizerFunc(authorize_with_tp_discharge) ts = _Service('myservice', auth, ids, locator) - class _LocalDischargeChecker(macaroonbakery.ThirdPartyCaveatChecker): + class _LocalDischargeChecker(bakery.ThirdPartyCaveatChecker): def check_third_party_caveat(_, ctx, info): if info.condition != 'question': raise ValueError('third party condition not recognized') @@ -267,29 +255,25 @@ class TestChecker(TestCase): return [] locator['other third party'] = _Discharger( - key=macaroonbakery.generate_key(), + key=bakery.generate_key(), checker=_LocalDischargeChecker(), locator=locator, ) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - client.do(ctx, ts, [macaroonbakery.Op(entity='something', - action='read')]) + client.do(ctx, ts, [bakery.Op(entity='something', action='read')]) self.assertEqual(self._discharges, [ _DischargeRecord(location='ids', user='bob'), - _DischargeRecord(location='other third party', - user='bob') + _DischargeRecord(location='other third party', user='bob') ]) def test_capability_combines_first_party_caveats(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'} - } - ) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) # Acquire two capabilities as different users, add some first party @@ -297,12 +281,12 @@ class TestChecker(TestCase): # capable of both operations. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') m1 = _Client(locator).capability( - ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + ctx, ts, [bakery.Op(entity='e1', action='read')]) m1.macaroon.add_first_party_caveat('true 1') m1.macaroon.add_first_party_caveat('true 2') ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m2 = _Client(locator).capability( - ctx, ts, [macaroonbakery.Op(entity='e2', action='read')]) + ctx, ts, [bakery.Op(entity='e2', action='read')]) m2.macaroon.add_first_party_caveat('true 3') m2.macaroon.add_first_party_caveat('true 4') @@ -311,8 +295,9 @@ class TestChecker(TestCase): client.add_macaroon(ts, 'authz2', [m2.macaroon]) m = client.capability(test_context, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [ 'true 1', 'true 2', @@ -323,11 +308,10 @@ class TestChecker(TestCase): def test_first_party_caveat_squashing(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'alice'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'alice'}, + }) ts = _Service('myservice', auth, ids, locator) tests = [ ('duplicates removed', [ @@ -366,13 +350,13 @@ class TestChecker(TestCase): # Make a first macaroon with all the required first party caveats. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') m1 = _Client(locator).capability( - ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + ctx, ts, [bakery.Op(entity='e1', action='read')]) m1.add_caveats(test[1], None, None) # Make a second macaroon that's not used to check that it's # caveats are not added. m2 = _Client(locator).capability( - ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + ctx, ts, [bakery.Op(entity='e1', action='read')]) m2.add_caveat(checkers.Caveat( condition='true notused', namespace='testns'), None, None) client = _Client(locator) @@ -380,8 +364,7 @@ class TestChecker(TestCase): client.add_macaroon(ts, 'authz2', [m2.macaroon]) m3 = client.capability( - test_context, ts, [macaroonbakery.Op(entity='e1', - action='read')]) + test_context, ts, [bakery.Op(entity='e1', action='read')]) self.assertEqual( _macaroon_conditions(m3.macaroon.caveats, False), _resolve_caveats(m3.namespace, test[2])) @@ -389,11 +372,11 @@ class TestChecker(TestCase): def test_login_only(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = macaroonbakery.ClosedAuthorizer() + auth = bakery.ClosedAuthorizer() ts = _Service('myservice', auth, ids, locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP]) + auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP]) self.assertIsNotNone(auth_info) self.assertEqual(auth_info.identity.id(), 'bob') @@ -402,18 +385,17 @@ class TestChecker(TestCase): ids = _IdService('ids', locator, self) auth = _OpAuthorizer( { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, }) ts = _Service('myservice', auth, ids, locator) # Acquire a capability for e1 but rely on authentication to # authorize e2. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - m = _Client(locator).discharged_capability(ctx, ts, - [macaroonbakery.Op( - entity='e1', - action='read')]) + m = _Client(locator).discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + ]) client = _Client(locator) client.add_macaroon(ts, 'authz', m) @@ -423,24 +405,22 @@ class TestChecker(TestCase): with self.assertRaises(_DischargeRequiredError): client.do_any( ctx, ts, [ - macaroonbakery.LOGIN_OP, - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e1', action='read') + bakery.LOGIN_OP, + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e1', action='read') ] ) self.assertEqual(len(self._discharges), 0) # Log in as bob. - _, err = client.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + _, err = client.do(ctx, ts, [bakery.LOGIN_OP]) # All the previous actions should now be allowed. - auth_info, allowed = client.do_any( - ctx, ts, [ - macaroonbakery.LOGIN_OP, - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e1', action='read') - ] - ) + auth_info, allowed = client.do_any(ctx, ts, [ + bakery.LOGIN_OP, + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e1', action='read'), + ]) self.assertEqual(auth_info.identity.id(), 'bob') self.assertEqual(len(auth_info.macaroons), 2) self.assertEqual(allowed, [True, True, True]) @@ -448,123 +428,121 @@ class TestChecker(TestCase): def test_auth_with_identity_from_context(self): locator = _DischargerLocator() ids = _BasicAuthIdService() - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'sherlock'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'sherlock'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) # Check that we can perform the ops with basic auth in the # context. ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes') auth_info = _Client(locator).do( - ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + ctx, ts, [bakery.Op(entity='e1', action='read')]) self.assertEqual(auth_info.identity.id(), 'sherlock') self.assertEqual(len(auth_info.macaroons), 0) def test_auth_login_op_with_identity_from_context(self): locator = _DischargerLocator() ids = _BasicAuthIdService() - ts = _Service('myservice', macaroonbakery.ClosedAuthorizer(), - ids, locator) + ts = _Service('myservice', bakery.ClosedAuthorizer(), ids, locator) # Check that we can use LoginOp # when auth isn't granted through macaroons. ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes') - auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP]) + auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'sherlock') self.assertEqual(len(auth_info.macaroons), 0) def test_operation_allow_caveat(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'bob'}, - macaroonbakery.Op(entity='e1', action='write'): {'bob'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'bob'}, + bakery.Op(entity='e1', action='write'): {'bob'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - m = client.capability( - ctx, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e1', action='write'), - macaroonbakery.Op(entity='e2', action='read')]) + m = client.capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e1', action='write'), + bakery.Op(entity='e2', action='read'), + ]) # Sanity check that we can do a write. ts.do(test_context, [[m.macaroon]], - [macaroonbakery.Op(entity='e1', action='write')]) + [bakery.Op(entity='e1', action='write')]) m.add_caveat(checkers.allow_caveat(['read']), None, None) # A read operation should work. ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) # A write operation should fail # even though the original macaroon allowed it. with self.assertRaises(_DischargeRequiredError): ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='write')]) + bakery.Op(entity='e1', action='write'), + ]) def test_operation_deny_caveat(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'bob'}, - macaroonbakery.Op(entity='e1', action='write'): {'bob'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'bob'}, + bakery.Op(entity='e1', action='write'): {'bob'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - m = client.capability( - ctx, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e1', action='write'), - macaroonbakery.Op(entity='e2', action='read')]) + m = client.capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e1', action='write'), + bakery.Op(entity='e2', action='read'), + ]) # Sanity check that we can do a write. ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='write')]) + bakery.Op(entity='e1', action='write')]) m.add_caveat(checkers.deny_caveat(['write']), None, None) # A read operation should work. - ts.do( - test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + ts.do(test_context, [[m.macaroon]], [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) # A write operation should fail # even though the original macaroon allowed it. with self.assertRaises(_DischargeRequiredError): ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='write')]) + bakery.Op(entity='e1', action='write')]) def test_duplicate_login_macaroons(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = macaroonbakery.ClosedAuthorizer() + auth = bakery.ClosedAuthorizer() ts = _Service('myservice', auth, ids, locator) # Acquire a login macaroon for bob. client1 = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - auth_info = client1.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + auth_info = client1.do(ctx, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'bob') # Acquire a login macaroon for alice. client2 = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - auth_info = client2.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + auth_info = client2.do(ctx, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'alice') # Combine the two login macaroons into one client. @@ -576,32 +554,30 @@ class TestChecker(TestCase): # We should authenticate as bob (because macaroons are presented # ordered by "cookie" name) - auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP]) + auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'bob') self.assertEqual(len(auth_info.macaroons), 1) # Try them the other way around and we should authenticate as alice. client3 = _Client(locator) - client3.add_macaroon(ts, '1.alice', - client2._macaroons[ts.name()]['authn']) - client3.add_macaroon(ts, '2.bob', - client1._macaroons[ts.name()]['authn']) + client3.add_macaroon(ts, '1.alice', client2._macaroons[ts.name()]['authn']) + client3.add_macaroon(ts, '2.bob', client1._macaroons[ts.name()]['authn']) - auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP]) + auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'alice') self.assertEqual(len(auth_info.macaroons), 1) def test_macaroon_ops_fatal_error(self): # When we get a non-VerificationError error from the # opstore, we don't do any more verification. - checker = macaroonbakery.Checker( + checker = bakery.Checker( macaroon_opstore=_MacaroonStoreWithError()) m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2) - with self.assertRaises(ValueError): - checker.auth([m]).allow(test_context, [macaroonbakery.LOGIN_OP]) + with self.assertRaises(bakery.AuthInitError): + checker.auth([m]).allow(test_context, [bakery.LOGIN_OP]) -class _DischargerLocator(object): +class _DischargerLocator(bakery.ThirdPartyLocator): def __init__(self, dischargers=None): if dischargers is None: dischargers = {} @@ -611,9 +587,9 @@ class _DischargerLocator(object): d = self._dischargers.get(loc) if d is None: return None - return macaroonbakery.ThirdPartyInfo( + return bakery.ThirdPartyInfo( public_key=d._key.public_key, - version=macaroonbakery.LATEST_BAKERY_VERSION, + version=bakery.LATEST_VERSION, ) def __setitem__(self, key, item): @@ -626,25 +602,23 @@ class _DischargerLocator(object): return self._dischargers.get(key) -class _IdService(macaroonbakery.IdentityClient, - macaroonbakery.ThirdPartyCaveatChecker): +class _IdService(bakery.IdentityClient, + bakery.ThirdPartyCaveatChecker): def __init__(self, location, locator, test_class): self._location = location self._test = test_class - key = macaroonbakery.generate_key() + key = bakery.generate_key() self._discharger = _Discharger(key=key, checker=self, locator=locator) locator[location] = self._discharger def check_third_party_caveat(self, ctx, info): if info.condition != 'is-authenticated-user': - raise macaroonbakery.CaveatNotRecognizedError( - 'third party condition not ' - 'recognized') + raise bakery.CaveatNotRecognizedError( + 'third party condition not recognized') username = ctx.get(_DISCHARGE_USER_KEY, '') if username == '': - return macaroonbakery.ThirdPartyCaveatCheckFailed( - 'no current user') + raise bakery.ThirdPartyCaveatCheckFailed('no current user') self._test._discharges.append( _DischargeRecord(location=self._location, user=username)) return [checkers.declared_caveat('username', username)] @@ -656,8 +630,8 @@ class _IdService(macaroonbakery.IdentityClient, def declared_identity(self, ctx, declared): user = declared.get('username') if user is None: - raise macaroonbakery.IdentityError('no username declared') - return macaroonbakery.SimpleIdentity(user) + raise bakery.IdentityError('no username declared') + return bakery.SimpleIdentity(user) _DISCHARGE_USER_KEY = checkers.ContextKey('user-key') @@ -676,13 +650,17 @@ class _Discharger(object): self._checker = checker def discharge(self, ctx, cav, payload): - return macaroonbakery.discharge(ctx, key=self._key, id=cav.caveat_id, - caveat=payload, - checker=self._checker, - locator=self._locator) + return bakery.discharge( + ctx, + key=self._key, + id=cav.caveat_id, + caveat=payload, + checker=self._checker, + locator=self._locator, + ) -class _OpAuthorizer(macaroonbakery.Authorizer): +class _OpAuthorizer(bakery.Authorizer): '''Implements bakery.Authorizer by looking the operation up in the given map. If the username is in the associated list or the list contains "everyone", authorization is granted. @@ -694,7 +672,7 @@ class _OpAuthorizer(macaroonbakery.Authorizer): self._auth = auth def authorize(self, ctx, id, ops): - return macaroonbakery.ACLAuthorizer( + return bakery.ACLAuthorizer( allow_public=True, get_acl=lambda ctx, op: self._auth.get(op, [])).authorize( ctx, id, ops) @@ -705,7 +683,7 @@ class _MacaroonStore(object): ''' def __init__(self, key, locator): - self._root_key_store = macaroonbakery.MemoryKeyStore() + self._root_key_store = bakery.MemoryKeyStore() self._key = key self._locator = locator @@ -713,9 +691,9 @@ class _MacaroonStore(object): root_key, id = self._root_key_store.root_key() m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops} data = json.dumps(m_id) - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key=root_key, id=data, location='', - version=macaroonbakery.LATEST_BAKERY_VERSION, + version=bakery.LATEST_VERSION, namespace=namespace) m.add_caveats(caveats, self._key, self._locator) return m @@ -739,7 +717,7 @@ class _MacaroonStore(object): ok = v.verify(macaroon=ms[0], key=root_key, discharge_macaroons=ms[1:]) if not ok: - raise macaroonbakery.VerificationError('invalid signature') + raise bakery.VerificationError('invalid signature') conditions = [] for m in ms: cavs = m.first_party_caveats() @@ -747,7 +725,7 @@ class _MacaroonStore(object): conditions.append(cav.caveat_id_bytes.decode('utf-8')) ops = [] for op in m_id['ops']: - ops.append(macaroonbakery.Op(entity=op[0], action=op[1])) + ops.append(bakery.Op(entity=op[0], action=op[1])) return ops, conditions @@ -761,8 +739,8 @@ class _Service(object): def __init__(self, name, auth, idm, locator): self._name = name - self._store = _MacaroonStore(macaroonbakery.generate_key(), locator) - self._checker = macaroonbakery.Checker( + self._store = _MacaroonStore(bakery.generate_key(), locator) + self._checker = bakery.Checker( checker=test_checker(), authorizer=auth, identity_client=idm, @@ -774,7 +752,7 @@ class _Service(object): def do(self, ctx, ms, ops): try: authInfo = self._checker.auth(ms).allow(ctx, ops) - except macaroonbakery.DischargeRequiredError as exc: + except bakery.DischargeRequiredError as exc: self._discharge_required_error(exc) return authInfo @@ -784,13 +762,13 @@ class _Service(object): try: authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops) return authInfo, allowed - except macaroonbakery.DischargeRequiredError as exc: + except bakery.DischargeRequiredError as exc: self._discharge_required_error(exc) def capability(self, ctx, ms, ops): try: conds = self._checker.auth(ms).allow_capability(ctx, ops) - except macaroonbakery.DischargeRequiredError as exc: + except bakery.DischargeRequiredError as exc: self._discharge_required_error(exc) m = self._store.new_macaroon(None, self._checker.namespace(), ops) @@ -802,7 +780,7 @@ class _Service(object): m = self._store.new_macaroon(err.cavs(), self._checker.namespace(), err.ops()) name = 'authz' - if len(err.ops()) == 1 and err.ops()[0] == macaroonbakery.LOGIN_OP: + if len(err.ops()) == 1 and err.ops()[0] == bakery.LOGIN_OP: name = 'authn' raise _DischargeRequiredError(name=name, m=m) @@ -824,7 +802,7 @@ class _Client(object): max_retries = 3 def __init__(self, dischargers): - self._key = macaroonbakery.generate_key() + self._key = bakery.generate_key() self._macaroons = {} self._dischargers = dischargers @@ -894,26 +872,25 @@ class _Client(object): return ms def _discharge_all(self, ctx, m): - def get_discharge(ctx, cav, pay_load): + def get_discharge(cav, payload): d = self._dischargers.get(cav.location) if d is None: raise ValueError('third party discharger ' '{} not found'.format(cav.location)) - return d.discharge(ctx, cav, pay_load) + return d.discharge(ctx, cav, payload) - return macaroonbakery.discharge_all(ctx, m, get_discharge) + return bakery.discharge_all(m, get_discharge) -class _BasicAuthIdService(macaroonbakery.IdentityClient): +class _BasicAuthIdService(bakery.IdentityClient): def identity_from_context(self, ctx): user, pwd = _basic_auth_from_context(ctx) if user != 'sherlock' or pwd != 'holmes': return None, None - return macaroonbakery.SimpleIdentity(user), None + return bakery.SimpleIdentity(user), None def declared_identity(self, ctx, declared): - raise macaroonbakery.IdentityError('no identity declarations in basic ' - 'auth id service') + raise bakery.IdentityError('no identity declarations in basic auth id service') _BASIC_AUTH_KEY = checkers.ContextKey('user-key') |