summaryrefslogtreecommitdiff
path: root/macaroonbakery/tests/test_discharge.py
diff options
context:
space:
mode:
Diffstat (limited to 'macaroonbakery/tests/test_discharge.py')
-rw-r--r--macaroonbakery/tests/test_discharge.py445
1 files changed, 445 insertions, 0 deletions
diff --git a/macaroonbakery/tests/test_discharge.py b/macaroonbakery/tests/test_discharge.py
new file mode 100644
index 0000000..6e2df6a
--- /dev/null
+++ b/macaroonbakery/tests/test_discharge.py
@@ -0,0 +1,445 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+import unittest
+
+from pymacaroons import MACAROON_V1, Macaroon
+from pymacaroons.exceptions import (
+ MacaroonInvalidSignatureException, MacaroonUnmetCaveatException
+)
+
+import macaroonbakery
+import macaroonbakery.checkers as checkers
+from macaroonbakery.tests import common
+
+
+class TestDischarge(unittest.TestCase):
+ def test_single_service_first_party(self):
+ ''' Creates a single service with a macaroon with one first party
+ caveat.
+ It creates a request with this macaroon and checks that the service
+ can verify this macaroon as valid.
+ '''
+ oc = common.new_bakery('bakerytest')
+ primary = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, None,
+ [macaroonbakery.LOGIN_OP])
+ self.assertEqual(primary.macaroon.location, 'bakerytest')
+ primary.add_caveat(checkers.Caveat(condition='str something',
+ namespace='testns'),
+ oc.oven.key, oc.oven.locator)
+ oc.checker.auth([[primary.macaroon]]).allow(
+ common.str_context('something'), [macaroonbakery.LOGIN_OP])
+
+ def test_macaroon_paper_fig6(self):
+ ''' Implements an example flow as described in the macaroons paper:
+ http://theory.stanford.edu/~ataly/Papers/macaroons.pdf
+ There are three services, ts, fs, bs:
+ ts is a store service which has deligated authority to a forum
+ service fs.
+ The forum service wants to require its users to be logged into to an
+ authentication service bs.
+
+ The client obtains a macaroon from fs (minted by ts, with a third party
+ caveat addressed to bs).
+ The client obtains a discharge macaroon from bs to satisfy this caveat.
+ The target service verifies the original macaroon it delegated to fs
+ No direct contact between bs and ts is required
+ '''
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+ ts = common.new_bakery('ts-loc', locator)
+ fs = common.new_bakery('fs-loc', locator)
+
+ # ts creates a macaroon.
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages,
+ None, [macaroonbakery.LOGIN_OP])
+
+ # ts somehow sends the macaroon to fs which adds a third party caveat
+ # to be discharged by bs.
+ ts_macaroon.add_caveat(checkers.Caveat(location='bs-loc',
+ condition='user==bob'),
+ fs.oven.key, fs.oven.locator)
+
+ # client asks for a discharge macaroon for each third party caveat
+ def get_discharge(ctx, cav, payload):
+ self.assertEqual(cav.location, 'bs-loc')
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker(
+ 'user==bob'),
+ bs.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
+ get_discharge)
+
+ ts.checker.auth([d]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ def test_discharge_with_version1_macaroon(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+ ts = common.new_bakery('ts-loc', locator)
+
+ # ts creates a old-version macaroon.
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages,
+ None, [macaroonbakery.LOGIN_OP])
+ ts_macaroon.add_caveat(checkers.Caveat(condition='something',
+ location='bs-loc'),
+ ts.oven.key, ts.oven.locator)
+
+ # client asks for a discharge macaroon for each third party caveat
+
+ def get_discharge(ctx, cav, payload):
+ # Make sure that the caveat id really is old-style.
+ try:
+ cav.caveat_id_bytes.decode('utf-8')
+ except UnicodeDecodeError:
+ self.fail('caveat id is not utf-8')
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker(
+ 'something'),
+ bs.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
+ get_discharge)
+
+ ts.checker.auth([d]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ for m in d:
+ self.assertEqual(m.version, MACAROON_V1)
+
+ def test_version1_macaroon_id(self):
+ # In the version 1 bakery, macaroon ids were hex-encoded with a
+ # hyphenated UUID suffix.
+ root_key_store = macaroonbakery.MemoryKeyStore()
+ b = macaroonbakery.Bakery(root_key_store=root_key_store,
+ identity_client=common.OneIdentity())
+ key, id = root_key_store.root_key()
+ root_key_store.get(id)
+ m = Macaroon(key=key, version=MACAROON_V1, location='',
+ identifier=id + b'-deadl00f')
+ b.checker.auth([[m]]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ def test_macaroon_paper_fig6_fails_without_discharges(self):
+ ''' Runs a similar test as test_macaroon_paper_fig6 without the client
+ discharging the third party caveats.
+ '''
+ locator = macaroonbakery.ThirdPartyStore()
+ ts = common.new_bakery('ts-loc', locator)
+ fs = common.new_bakery('fs-loc', locator)
+ common.new_bakery('as-loc', locator)
+
+ # ts creates a macaroon.
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, None,
+ [macaroonbakery.LOGIN_OP])
+
+ # ts somehow sends the macaroon to fs which adds a third party
+ # caveat to be discharged by as.
+ ts_macaroon.add_caveat(checkers.Caveat(location='as-loc',
+ condition='user==bob'),
+ fs.oven.key, fs.oven.locator)
+
+ # client makes request to ts
+ try:
+ ts.checker.auth([[ts_macaroon.macaroon]]).allow(
+ common.test_context,
+ macaroonbakery.LOGIN_OP
+ )
+ self.fail('macaroon unmet should be raised')
+ except MacaroonUnmetCaveatException:
+ pass
+
+ def test_macaroon_paper_fig6_fails_with_binding_on_tampered_sig(self):
+ ''' Runs a similar test as test_macaroon_paper_fig6 with the discharge
+ macaroon binding being done on a tampered signature.
+ '''
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+ ts = common.new_bakery('ts-loc', locator)
+
+ # ts creates a macaroon.
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, None,
+ [macaroonbakery.LOGIN_OP])
+ # ts somehow sends the macaroon to fs which adds a third party caveat
+ # to be discharged by as.
+ ts_macaroon.add_caveat(checkers.Caveat(condition='user==bob',
+ location='bs-loc'),
+ ts.oven.key, ts.oven.locator)
+
+ # client asks for a discharge macaroon for each third party caveat
+ def get_discharge(ctx, cav, payload):
+ self.assertEqual(cav.location, 'bs-loc')
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker(
+ 'user==bob'),
+ bs.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
+ get_discharge)
+ # client has all the discharge macaroons. For each discharge macaroon
+ # bind it to our ts_macaroon and add it to our request.
+ tampered_macaroon = Macaroon()
+ for i, dm in enumerate(d[1:]):
+ d[i + 1] = tampered_macaroon.prepare_for_request(dm)
+
+ # client makes request to ts.
+ with self.assertRaises(MacaroonInvalidSignatureException) as exc:
+ ts.checker.auth([d]).allow(common.test_context,
+ macaroonbakery.LOGIN_OP)
+ self.assertEqual('Signatures do not match', exc.exception.args[0])
+
+ def test_need_declared(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ first_party = common.new_bakery('first', locator)
+ third_party = common.new_bakery('third', locator)
+
+ # firstParty mints a macaroon with a third-party caveat addressed
+ # to thirdParty with a need-declared caveat.
+ m = first_party.oven.macaroon(
+ macaroonbakery.LATEST_BAKERY_VERSION, common.ages, [
+ checkers.need_declared_caveat(
+ checkers.Caveat(location='third', condition='something'),
+ ['foo', 'bar']
+ )
+ ], [macaroonbakery.LOGIN_OP])
+
+ # The client asks for a discharge macaroon for each third party caveat.
+ def get_discharge(ctx, cav, payload):
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ third_party.oven.key,
+ common.ThirdPartyStrcmpChecker(
+ 'something'),
+ third_party.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+
+ # The required declared attributes should have been added
+ # to the discharge macaroons.
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'foo': '',
+ 'bar': '',
+ })
+
+ # Make sure the macaroons actually check out correctly
+ # when provided with the declared checker.
+ ctx = checkers.context_with_declared(common.test_context, declared)
+ first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+
+ # Try again when the third party does add a required declaration.
+
+ # The client asks for a discharge macaroon for each third party caveat.
+ def get_discharge(ctx, cav, payload):
+ checker = common.ThirdPartyCheckerWithCaveats([
+ checkers.declared_caveat('foo', 'a'),
+ checkers.declared_caveat('arble', 'b')
+ ])
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ third_party.oven.key,
+ checker,
+ third_party.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+
+ # One attribute should have been added, the other was already there.
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'foo': 'a',
+ 'bar': '',
+ 'arble': 'b',
+ })
+
+ ctx = checkers.context_with_declared(common.test_context, declared)
+ first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+
+ # Try again, but this time pretend a client is sneakily trying
+ # to add another 'declared' attribute to alter the declarations.
+
+ def get_discharge(ctx, cav, payload):
+ checker = common.ThirdPartyCheckerWithCaveats([
+ checkers.declared_caveat('foo', 'a'),
+ checkers.declared_caveat('arble', 'b'),
+ ])
+
+ # Sneaky client adds a first party caveat.
+ m = macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ third_party.oven.key, checker,
+ third_party.oven.locator)
+ m.add_caveat(checkers.declared_caveat('foo', 'c'), None, None)
+ return m
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'bar': '',
+ 'arble': 'b',
+ })
+
+ with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ first_party.checker.auth([d]).allow(common.test_context,
+ macaroonbakery.LOGIN_OP)
+ self.assertEqual('cannot authorize login macaroon: caveat '
+ '"declared foo a" not satisfied: got foo=null, '
+ 'expected "a"', exc.exception.args[0])
+
+ def test_discharge_two_need_declared(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ first_party = common.new_bakery('first', locator)
+ third_party = common.new_bakery('third', locator)
+
+ # first_party mints a macaroon with two third party caveats
+ # with overlapping attributes.
+ m = first_party.oven.macaroon(
+ macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, [
+ checkers.need_declared_caveat(
+ checkers.Caveat(location='third', condition='x'),
+ ['foo', 'bar']),
+ checkers.need_declared_caveat(
+ checkers.Caveat(location='third', condition='y'),
+ ['bar', 'baz']),
+ ], [macaroonbakery.LOGIN_OP])
+
+ # The client asks for a discharge macaroon for each third party caveat.
+ # Since no declarations are added by the discharger,
+
+ def get_discharge(ctx, cav, payload):
+ return macaroonbakery.discharge(
+ ctx, cav.caveat_id_bytes, payload, third_party.oven.key,
+ common.ThirdPartyCaveatCheckerEmpty(),
+ third_party.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'foo': '',
+ 'bar': '',
+ 'baz': '',
+ })
+ ctx = checkers.context_with_declared(common.test_context, declared)
+ first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+
+ # If they return conflicting values, the discharge fails.
+ # The client asks for a discharge macaroon for each third party caveat.
+ # Since no declarations are added by the discharger,
+ class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(self, ctx, cav_info):
+ if cav_info.condition == b'x':
+ return [checkers.declared_caveat('foo', 'fooval1')]
+ if cav_info.condition == b'y':
+ return [
+ checkers.declared_caveat('foo', 'fooval2'),
+ checkers.declared_caveat('baz', 'bazval')
+ ]
+ raise common.ThirdPartyCaveatCheckFailed('not matched')
+
+ def get_discharge(ctx, cav, payload):
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ third_party.oven.key,
+ ThirdPartyCaveatCheckerF(),
+ third_party.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'bar': '',
+ 'baz': 'bazval',
+ })
+ with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ first_party.checker.auth([d]).allow(common.test_context,
+ macaroonbakery.LOGIN_OP)
+ self.assertEqual('cannot authorize login macaroon: caveat "declared '
+ 'foo fooval1" not satisfied: got foo=null, expected '
+ '"fooval1"', exc.exception.args[0])
+
+ def test_discharge_macaroon_cannot_be_used_as_normal_macaroon(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ first_party = common.new_bakery('first', locator)
+ third_party = common.new_bakery('third', locator)
+
+ # First party mints a macaroon with a 3rd party caveat.
+ m = first_party.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, [
+ checkers.Caveat(location='third',
+ condition='true')],
+ [macaroonbakery.LOGIN_OP])
+
+ # Acquire the discharge macaroon, but don't bind it to the original.
+ class M:
+ unbound = None
+
+ def get_discharge(ctx, cav, payload):
+ m = macaroonbakery.discharge(
+ ctx, cav.caveat_id_bytes, payload, third_party.oven.key,
+ common.ThirdPartyStrcmpChecker('true'),
+ third_party.oven.locator)
+ M.unbound = m.macaroon.copy()
+ return m
+
+ macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ self.assertIsNotNone(M.unbound)
+
+ # Make sure it cannot be used as a normal macaroon in the third party.
+ with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ third_party.checker.auth([[M.unbound]]).allow(
+ common.test_context, [macaroonbakery.LOGIN_OP])
+ self.assertEqual('no operations found in macaroon',
+ exc.exception.args[0])
+
+ def test_third_party_discharge_macaroon_ids_are_small(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ bakeries = {
+ 'ts-loc': common.new_bakery('ts-loc', locator),
+ 'as1-loc': common.new_bakery('as1-loc', locator),
+ 'as2-loc': common.new_bakery('as2-loc', locator),
+ }
+ ts = bakeries['ts-loc']
+
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages,
+ None, [macaroonbakery.LOGIN_OP])
+ ts_macaroon.add_caveat(checkers.Caveat(condition='something',
+ location='as1-loc'),
+ ts.oven.key, ts.oven.locator)
+
+ class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ def __init__(self, loc):
+ self._loc = loc
+
+ def check_third_party_caveat(self, ctx, info):
+ if self._loc == 'as1-loc':
+ return [checkers.Caveat(condition='something',
+ location='as2-loc')]
+ if self._loc == 'as2-loc':
+ return []
+ raise common.ThirdPartyCaveatCheckFailed(
+ 'unknown location {}'.format(self._loc))
+
+ def get_discharge(ctx, cav, payload):
+ oven = bakeries[cav.location].oven
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ oven.key,
+ ThirdPartyCaveatCheckerF(
+ cav.location),
+ oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
+ get_discharge)
+ ts.checker.auth([d]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ for i, m in enumerate(d):
+ for j, cav in enumerate(m.caveats):
+ if (cav.verification_key_id is not None and
+ len(cav.caveat_id) > 3):
+ self.fail('caveat id on caveat {} of macaroon {} '
+ 'is too big ({})'.format(j, i, cav.id))