diff options
Diffstat (limited to 'macaroonbakery/tests/test_discharge.py')
-rw-r--r-- | macaroonbakery/tests/test_discharge.py | 274 |
1 files changed, 152 insertions, 122 deletions
diff --git a/macaroonbakery/tests/test_discharge.py b/macaroonbakery/tests/test_discharge.py index 6e2df6a..433483a 100644 --- a/macaroonbakery/tests/test_discharge.py +++ b/macaroonbakery/tests/test_discharge.py @@ -3,11 +3,8 @@ import unittest from pymacaroons import MACAROON_V1, Macaroon -from pymacaroons.exceptions import ( - MacaroonInvalidSignatureException, MacaroonUnmetCaveatException -) -import macaroonbakery +import macaroonbakery as bakery import macaroonbakery.checkers as checkers from macaroonbakery.tests import common @@ -20,15 +17,15 @@ class TestDischarge(unittest.TestCase): can verify this macaroon as valid. ''' oc = common.new_bakery('bakerytest') - primary = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + primary = oc.oven.macaroon(bakery.LATEST_VERSION, common.ages, None, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) self.assertEqual(primary.macaroon.location, 'bakerytest') primary.add_caveat(checkers.Caveat(condition='str something', namespace='testns'), oc.oven.key, oc.oven.locator) oc.checker.auth([[primary.macaroon]]).allow( - common.str_context('something'), [macaroonbakery.LOGIN_OP]) + common.str_context('something'), [bakery.LOGIN_OP]) def test_macaroon_paper_fig6(self): ''' Implements an example flow as described in the macaroons paper: @@ -45,15 +42,15 @@ class TestDischarge(unittest.TestCase): The target service verifies the original macaroon it delegated to fs No direct contact between bs and ts is required ''' - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) ts = common.new_bakery('ts-loc', locator) fs = common.new_bakery('fs-loc', locator) # ts creates a macaroon. - ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION, common.ages, - None, [macaroonbakery.LOGIN_OP]) + None, [bakery.LOGIN_OP]) # ts somehow sends the macaroon to fs which adds a third party caveat # to be discharged by bs. @@ -62,51 +59,55 @@ class TestDischarge(unittest.TestCase): fs.oven.key, fs.oven.locator) # client asks for a discharge macaroon for each third party caveat - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): self.assertEqual(cav.location, 'bs-loc') - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - bs.oven.key, - common.ThirdPartyStrcmpChecker( - 'user==bob'), - bs.oven.locator) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker('user==bob'), + bs.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, - get_discharge) + d = bakery.discharge_all(ts_macaroon, get_discharge) ts.checker.auth([d]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) def test_discharge_with_version1_macaroon(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) ts = common.new_bakery('ts-loc', locator) # ts creates a old-version macaroon. - ts_macaroon = ts.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages, - None, [macaroonbakery.LOGIN_OP]) + ts_macaroon = ts.oven.macaroon(bakery.VERSION_1, common.ages, + None, [bakery.LOGIN_OP]) ts_macaroon.add_caveat(checkers.Caveat(condition='something', location='bs-loc'), ts.oven.key, ts.oven.locator) # client asks for a discharge macaroon for each third party caveat - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): # Make sure that the caveat id really is old-style. try: cav.caveat_id_bytes.decode('utf-8') except UnicodeDecodeError: self.fail('caveat id is not utf-8') - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - bs.oven.key, - common.ThirdPartyStrcmpChecker( - 'something'), - bs.oven.locator) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker('something'), + bs.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, - get_discharge) + d = bakery.discharge_all(ts_macaroon, get_discharge) ts.checker.auth([d]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) for m in d: self.assertEqual(m.version, MACAROON_V1) @@ -114,29 +115,31 @@ class TestDischarge(unittest.TestCase): def test_version1_macaroon_id(self): # In the version 1 bakery, macaroon ids were hex-encoded with a # hyphenated UUID suffix. - root_key_store = macaroonbakery.MemoryKeyStore() - b = macaroonbakery.Bakery(root_key_store=root_key_store, - identity_client=common.OneIdentity()) + root_key_store = bakery.MemoryKeyStore() + b = bakery.Bakery( + root_key_store=root_key_store, + identity_client=common.OneIdentity(), + ) key, id = root_key_store.root_key() root_key_store.get(id) m = Macaroon(key=key, version=MACAROON_V1, location='', identifier=id + b'-deadl00f') b.checker.auth([[m]]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) def test_macaroon_paper_fig6_fails_without_discharges(self): ''' Runs a similar test as test_macaroon_paper_fig6 without the client discharging the third party caveats. ''' - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() ts = common.new_bakery('ts-loc', locator) fs = common.new_bakery('fs-loc', locator) common.new_bakery('as-loc', locator) # ts creates a macaroon. - ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION, common.ages, None, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) # ts somehow sends the macaroon to fs which adds a third party # caveat to be discharged by as. @@ -148,24 +151,24 @@ class TestDischarge(unittest.TestCase): try: ts.checker.auth([[ts_macaroon.macaroon]]).allow( common.test_context, - macaroonbakery.LOGIN_OP + bakery.LOGIN_OP ) self.fail('macaroon unmet should be raised') - except MacaroonUnmetCaveatException: + except bakery.VerificationError: pass def test_macaroon_paper_fig6_fails_with_binding_on_tampered_sig(self): ''' Runs a similar test as test_macaroon_paper_fig6 with the discharge macaroon binding being done on a tampered signature. ''' - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) ts = common.new_bakery('ts-loc', locator) # ts creates a macaroon. - ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION, common.ages, None, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) # ts somehow sends the macaroon to fs which adds a third party caveat # to be discharged by as. ts_macaroon.add_caveat(checkers.Caveat(condition='user==bob', @@ -173,16 +176,18 @@ class TestDischarge(unittest.TestCase): ts.oven.key, ts.oven.locator) # client asks for a discharge macaroon for each third party caveat - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): self.assertEqual(cav.location, 'bs-loc') - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - bs.oven.key, - common.ThirdPartyStrcmpChecker( - 'user==bob'), - bs.oven.locator) - - d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, - get_discharge) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker('user==bob'), + bs.oven.locator, + ) + + d = bakery.discharge_all(ts_macaroon, get_discharge) # client has all the discharge macaroons. For each discharge macaroon # bind it to our ts_macaroon and add it to our request. tampered_macaroon = Macaroon() @@ -190,35 +195,39 @@ class TestDischarge(unittest.TestCase): d[i + 1] = tampered_macaroon.prepare_for_request(dm) # client makes request to ts. - with self.assertRaises(MacaroonInvalidSignatureException) as exc: + with self.assertRaises(bakery.VerificationError) as exc: ts.checker.auth([d]).allow(common.test_context, - macaroonbakery.LOGIN_OP) - self.assertEqual('Signatures do not match', exc.exception.args[0]) + bakery.LOGIN_OP) + self.assertEqual('verification failed: Signatures do not match', + exc.exception.args[0]) def test_need_declared(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() first_party = common.new_bakery('first', locator) third_party = common.new_bakery('third', locator) # firstParty mints a macaroon with a third-party caveat addressed # to thirdParty with a need-declared caveat. m = first_party.oven.macaroon( - macaroonbakery.LATEST_BAKERY_VERSION, common.ages, [ + bakery.LATEST_VERSION, common.ages, [ checkers.need_declared_caveat( checkers.Caveat(location='third', condition='something'), ['foo', 'bar'] ) - ], [macaroonbakery.LOGIN_OP]) + ], [bakery.LOGIN_OP]) # The client asks for a discharge macaroon for each third party caveat. - def get_discharge(ctx, cav, payload): - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - third_party.oven.key, - common.ThirdPartyStrcmpChecker( - 'something'), - third_party.oven.locator) + def get_discharge(cav, payload): + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, + common.ThirdPartyStrcmpChecker('something'), + third_party.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) # The required declared attributes should have been added # to the discharge macaroons. @@ -231,22 +240,26 @@ class TestDischarge(unittest.TestCase): # Make sure the macaroons actually check out correctly # when provided with the declared checker. ctx = checkers.context_with_declared(common.test_context, declared) - first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP]) # Try again when the third party does add a required declaration. # The client asks for a discharge macaroon for each third party caveat. - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): checker = common.ThirdPartyCheckerWithCaveats([ checkers.declared_caveat('foo', 'a'), checkers.declared_caveat('arble', 'b') ]) - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - third_party.oven.key, - checker, - third_party.oven.locator) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, + checker, + third_party.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) # One attribute should have been added, the other was already there. declared = checkers.infer_declared(d, first_party.checker.namespace()) @@ -257,25 +270,28 @@ class TestDischarge(unittest.TestCase): }) ctx = checkers.context_with_declared(common.test_context, declared) - first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP]) # Try again, but this time pretend a client is sneakily trying # to add another 'declared' attribute to alter the declarations. - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): checker = common.ThirdPartyCheckerWithCaveats([ checkers.declared_caveat('foo', 'a'), checkers.declared_caveat('arble', 'b'), ]) # Sneaky client adds a first party caveat. - m = macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - third_party.oven.key, checker, - third_party.oven.locator) + m = bakery.discharge( + common.test_context, cav.caveat_id_bytes, + payload, + third_party.oven.key, checker, + third_party.oven.locator, + ) m.add_caveat(checkers.declared_caveat('foo', 'c'), None, None) return m - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) declared = checkers.infer_declared(d, first_party.checker.namespace()) self.assertEqual(declared, { @@ -283,22 +299,22 @@ class TestDischarge(unittest.TestCase): 'arble': 'b', }) - with self.assertRaises(macaroonbakery.AuthInitError) as exc: + with self.assertRaises(bakery.AuthInitError) as exc: first_party.checker.auth([d]).allow(common.test_context, - macaroonbakery.LOGIN_OP) + bakery.LOGIN_OP) self.assertEqual('cannot authorize login macaroon: caveat ' '"declared foo a" not satisfied: got foo=null, ' 'expected "a"', exc.exception.args[0]) def test_discharge_two_need_declared(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() first_party = common.new_bakery('first', locator) third_party = common.new_bakery('third', locator) # first_party mints a macaroon with two third party caveats # with overlapping attributes. m = first_party.oven.macaroon( - macaroonbakery.LATEST_BAKERY_VERSION, + bakery.LATEST_VERSION, common.ages, [ checkers.need_declared_caveat( checkers.Caveat(location='third', condition='x'), @@ -306,18 +322,22 @@ class TestDischarge(unittest.TestCase): checkers.need_declared_caveat( checkers.Caveat(location='third', condition='y'), ['bar', 'baz']), - ], [macaroonbakery.LOGIN_OP]) + ], [bakery.LOGIN_OP]) # The client asks for a discharge macaroon for each third party caveat. # Since no declarations are added by the discharger, - def get_discharge(ctx, cav, payload): - return macaroonbakery.discharge( - ctx, cav.caveat_id_bytes, payload, third_party.oven.key, + def get_discharge(cav, payload): + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, common.ThirdPartyCaveatCheckerEmpty(), - third_party.oven.locator) + third_party.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) declared = checkers.infer_declared(d, first_party.checker.namespace()) self.assertEqual(declared, { 'foo': '', @@ -325,12 +345,12 @@ class TestDischarge(unittest.TestCase): 'baz': '', }) ctx = checkers.context_with_declared(common.test_context, declared) - first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP]) # If they return conflicting values, the discharge fails. # The client asks for a discharge macaroon for each third party caveat. # Since no declarations are added by the discharger, - class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker): + class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker): def check_third_party_caveat(self, ctx, cav_info): if cav_info.condition == b'x': return [checkers.declared_caveat('foo', 'fooval1')] @@ -341,62 +361,70 @@ class TestDischarge(unittest.TestCase): ] raise common.ThirdPartyCaveatCheckFailed('not matched') - def get_discharge(ctx, cav, payload): - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - third_party.oven.key, - ThirdPartyCaveatCheckerF(), - third_party.oven.locator) + def get_discharge(cav, payload): + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, + ThirdPartyCaveatCheckerF(), + third_party.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) declared = checkers.infer_declared(d, first_party.checker.namespace()) self.assertEqual(declared, { 'bar': '', 'baz': 'bazval', }) - with self.assertRaises(macaroonbakery.AuthInitError) as exc: + with self.assertRaises(bakery.AuthInitError) as exc: first_party.checker.auth([d]).allow(common.test_context, - macaroonbakery.LOGIN_OP) + bakery.LOGIN_OP) self.assertEqual('cannot authorize login macaroon: caveat "declared ' 'foo fooval1" not satisfied: got foo=null, expected ' '"fooval1"', exc.exception.args[0]) def test_discharge_macaroon_cannot_be_used_as_normal_macaroon(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() first_party = common.new_bakery('first', locator) third_party = common.new_bakery('third', locator) # First party mints a macaroon with a 3rd party caveat. - m = first_party.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + m = first_party.oven.macaroon(bakery.LATEST_VERSION, common.ages, [ checkers.Caveat(location='third', condition='true')], - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) # Acquire the discharge macaroon, but don't bind it to the original. class M: unbound = None - def get_discharge(ctx, cav, payload): - m = macaroonbakery.discharge( - ctx, cav.caveat_id_bytes, payload, third_party.oven.key, + def get_discharge(cav, payload): + m = bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, common.ThirdPartyStrcmpChecker('true'), - third_party.oven.locator) + third_party.oven.locator, + ) M.unbound = m.macaroon.copy() return m - macaroonbakery.discharge_all(common.test_context, m, get_discharge) + bakery.discharge_all(m, get_discharge) self.assertIsNotNone(M.unbound) # Make sure it cannot be used as a normal macaroon in the third party. - with self.assertRaises(macaroonbakery.AuthInitError) as exc: + with self.assertRaises(bakery.VerificationError) as exc: third_party.checker.auth([[M.unbound]]).allow( - common.test_context, [macaroonbakery.LOGIN_OP]) + common.test_context, [bakery.LOGIN_OP]) self.assertEqual('no operations found in macaroon', exc.exception.args[0]) def test_third_party_discharge_macaroon_ids_are_small(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bakeries = { 'ts-loc': common.new_bakery('ts-loc', locator), 'as1-loc': common.new_bakery('as1-loc', locator), @@ -404,14 +432,14 @@ class TestDischarge(unittest.TestCase): } ts = bakeries['ts-loc'] - ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION, common.ages, - None, [macaroonbakery.LOGIN_OP]) + None, [bakery.LOGIN_OP]) ts_macaroon.add_caveat(checkers.Caveat(condition='something', location='as1-loc'), ts.oven.key, ts.oven.locator) - class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker): + class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker): def __init__(self, loc): self._loc = loc @@ -424,18 +452,20 @@ class TestDischarge(unittest.TestCase): raise common.ThirdPartyCaveatCheckFailed( 'unknown location {}'.format(self._loc)) - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): oven = bakeries[cav.location].oven - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - oven.key, - ThirdPartyCaveatCheckerF( - cav.location), - oven.locator) - - d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, - get_discharge) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + oven.key, + ThirdPartyCaveatCheckerF(cav.location), + oven.locator, + ) + + d = bakery.discharge_all(ts_macaroon, get_discharge) ts.checker.auth([d]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) for i, m in enumerate(d): for j, cav in enumerate(m.caveats): |