summaryrefslogtreecommitdiff
path: root/macaroonbakery/tests/test_discharge.py
diff options
context:
space:
mode:
Diffstat (limited to 'macaroonbakery/tests/test_discharge.py')
-rw-r--r--macaroonbakery/tests/test_discharge.py274
1 files changed, 152 insertions, 122 deletions
diff --git a/macaroonbakery/tests/test_discharge.py b/macaroonbakery/tests/test_discharge.py
index 6e2df6a..433483a 100644
--- a/macaroonbakery/tests/test_discharge.py
+++ b/macaroonbakery/tests/test_discharge.py
@@ -3,11 +3,8 @@
import unittest
from pymacaroons import MACAROON_V1, Macaroon
-from pymacaroons.exceptions import (
- MacaroonInvalidSignatureException, MacaroonUnmetCaveatException
-)
-import macaroonbakery
+import macaroonbakery as bakery
import macaroonbakery.checkers as checkers
from macaroonbakery.tests import common
@@ -20,15 +17,15 @@ class TestDischarge(unittest.TestCase):
can verify this macaroon as valid.
'''
oc = common.new_bakery('bakerytest')
- primary = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ primary = oc.oven.macaroon(bakery.LATEST_VERSION,
common.ages, None,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
self.assertEqual(primary.macaroon.location, 'bakerytest')
primary.add_caveat(checkers.Caveat(condition='str something',
namespace='testns'),
oc.oven.key, oc.oven.locator)
oc.checker.auth([[primary.macaroon]]).allow(
- common.str_context('something'), [macaroonbakery.LOGIN_OP])
+ common.str_context('something'), [bakery.LOGIN_OP])
def test_macaroon_paper_fig6(self):
''' Implements an example flow as described in the macaroons paper:
@@ -45,15 +42,15 @@ class TestDischarge(unittest.TestCase):
The target service verifies the original macaroon it delegated to fs
No direct contact between bs and ts is required
'''
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
ts = common.new_bakery('ts-loc', locator)
fs = common.new_bakery('fs-loc', locator)
# ts creates a macaroon.
- ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION,
common.ages,
- None, [macaroonbakery.LOGIN_OP])
+ None, [bakery.LOGIN_OP])
# ts somehow sends the macaroon to fs which adds a third party caveat
# to be discharged by bs.
@@ -62,51 +59,55 @@ class TestDischarge(unittest.TestCase):
fs.oven.key, fs.oven.locator)
# client asks for a discharge macaroon for each third party caveat
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
self.assertEqual(cav.location, 'bs-loc')
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- bs.oven.key,
- common.ThirdPartyStrcmpChecker(
- 'user==bob'),
- bs.oven.locator)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker('user==bob'),
+ bs.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
- get_discharge)
+ d = bakery.discharge_all(ts_macaroon, get_discharge)
ts.checker.auth([d]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
def test_discharge_with_version1_macaroon(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
ts = common.new_bakery('ts-loc', locator)
# ts creates a old-version macaroon.
- ts_macaroon = ts.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages,
- None, [macaroonbakery.LOGIN_OP])
+ ts_macaroon = ts.oven.macaroon(bakery.VERSION_1, common.ages,
+ None, [bakery.LOGIN_OP])
ts_macaroon.add_caveat(checkers.Caveat(condition='something',
location='bs-loc'),
ts.oven.key, ts.oven.locator)
# client asks for a discharge macaroon for each third party caveat
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
# Make sure that the caveat id really is old-style.
try:
cav.caveat_id_bytes.decode('utf-8')
except UnicodeDecodeError:
self.fail('caveat id is not utf-8')
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- bs.oven.key,
- common.ThirdPartyStrcmpChecker(
- 'something'),
- bs.oven.locator)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker('something'),
+ bs.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
- get_discharge)
+ d = bakery.discharge_all(ts_macaroon, get_discharge)
ts.checker.auth([d]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
for m in d:
self.assertEqual(m.version, MACAROON_V1)
@@ -114,29 +115,31 @@ class TestDischarge(unittest.TestCase):
def test_version1_macaroon_id(self):
# In the version 1 bakery, macaroon ids were hex-encoded with a
# hyphenated UUID suffix.
- root_key_store = macaroonbakery.MemoryKeyStore()
- b = macaroonbakery.Bakery(root_key_store=root_key_store,
- identity_client=common.OneIdentity())
+ root_key_store = bakery.MemoryKeyStore()
+ b = bakery.Bakery(
+ root_key_store=root_key_store,
+ identity_client=common.OneIdentity(),
+ )
key, id = root_key_store.root_key()
root_key_store.get(id)
m = Macaroon(key=key, version=MACAROON_V1, location='',
identifier=id + b'-deadl00f')
b.checker.auth([[m]]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
def test_macaroon_paper_fig6_fails_without_discharges(self):
''' Runs a similar test as test_macaroon_paper_fig6 without the client
discharging the third party caveats.
'''
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
ts = common.new_bakery('ts-loc', locator)
fs = common.new_bakery('fs-loc', locator)
common.new_bakery('as-loc', locator)
# ts creates a macaroon.
- ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION,
common.ages, None,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
# ts somehow sends the macaroon to fs which adds a third party
# caveat to be discharged by as.
@@ -148,24 +151,24 @@ class TestDischarge(unittest.TestCase):
try:
ts.checker.auth([[ts_macaroon.macaroon]]).allow(
common.test_context,
- macaroonbakery.LOGIN_OP
+ bakery.LOGIN_OP
)
self.fail('macaroon unmet should be raised')
- except MacaroonUnmetCaveatException:
+ except bakery.VerificationError:
pass
def test_macaroon_paper_fig6_fails_with_binding_on_tampered_sig(self):
''' Runs a similar test as test_macaroon_paper_fig6 with the discharge
macaroon binding being done on a tampered signature.
'''
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
ts = common.new_bakery('ts-loc', locator)
# ts creates a macaroon.
- ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION,
common.ages, None,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
# ts somehow sends the macaroon to fs which adds a third party caveat
# to be discharged by as.
ts_macaroon.add_caveat(checkers.Caveat(condition='user==bob',
@@ -173,16 +176,18 @@ class TestDischarge(unittest.TestCase):
ts.oven.key, ts.oven.locator)
# client asks for a discharge macaroon for each third party caveat
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
self.assertEqual(cav.location, 'bs-loc')
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- bs.oven.key,
- common.ThirdPartyStrcmpChecker(
- 'user==bob'),
- bs.oven.locator)
-
- d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
- get_discharge)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker('user==bob'),
+ bs.oven.locator,
+ )
+
+ d = bakery.discharge_all(ts_macaroon, get_discharge)
# client has all the discharge macaroons. For each discharge macaroon
# bind it to our ts_macaroon and add it to our request.
tampered_macaroon = Macaroon()
@@ -190,35 +195,39 @@ class TestDischarge(unittest.TestCase):
d[i + 1] = tampered_macaroon.prepare_for_request(dm)
# client makes request to ts.
- with self.assertRaises(MacaroonInvalidSignatureException) as exc:
+ with self.assertRaises(bakery.VerificationError) as exc:
ts.checker.auth([d]).allow(common.test_context,
- macaroonbakery.LOGIN_OP)
- self.assertEqual('Signatures do not match', exc.exception.args[0])
+ bakery.LOGIN_OP)
+ self.assertEqual('verification failed: Signatures do not match',
+ exc.exception.args[0])
def test_need_declared(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
first_party = common.new_bakery('first', locator)
third_party = common.new_bakery('third', locator)
# firstParty mints a macaroon with a third-party caveat addressed
# to thirdParty with a need-declared caveat.
m = first_party.oven.macaroon(
- macaroonbakery.LATEST_BAKERY_VERSION, common.ages, [
+ bakery.LATEST_VERSION, common.ages, [
checkers.need_declared_caveat(
checkers.Caveat(location='third', condition='something'),
['foo', 'bar']
)
- ], [macaroonbakery.LOGIN_OP])
+ ], [bakery.LOGIN_OP])
# The client asks for a discharge macaroon for each third party caveat.
- def get_discharge(ctx, cav, payload):
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- third_party.oven.key,
- common.ThirdPartyStrcmpChecker(
- 'something'),
- third_party.oven.locator)
+ def get_discharge(cav, payload):
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
+ common.ThirdPartyStrcmpChecker('something'),
+ third_party.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
# The required declared attributes should have been added
# to the discharge macaroons.
@@ -231,22 +240,26 @@ class TestDischarge(unittest.TestCase):
# Make sure the macaroons actually check out correctly
# when provided with the declared checker.
ctx = checkers.context_with_declared(common.test_context, declared)
- first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+ first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP])
# Try again when the third party does add a required declaration.
# The client asks for a discharge macaroon for each third party caveat.
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
checker = common.ThirdPartyCheckerWithCaveats([
checkers.declared_caveat('foo', 'a'),
checkers.declared_caveat('arble', 'b')
])
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- third_party.oven.key,
- checker,
- third_party.oven.locator)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
+ checker,
+ third_party.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
# One attribute should have been added, the other was already there.
declared = checkers.infer_declared(d, first_party.checker.namespace())
@@ -257,25 +270,28 @@ class TestDischarge(unittest.TestCase):
})
ctx = checkers.context_with_declared(common.test_context, declared)
- first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+ first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP])
# Try again, but this time pretend a client is sneakily trying
# to add another 'declared' attribute to alter the declarations.
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
checker = common.ThirdPartyCheckerWithCaveats([
checkers.declared_caveat('foo', 'a'),
checkers.declared_caveat('arble', 'b'),
])
# Sneaky client adds a first party caveat.
- m = macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- third_party.oven.key, checker,
- third_party.oven.locator)
+ m = bakery.discharge(
+ common.test_context, cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key, checker,
+ third_party.oven.locator,
+ )
m.add_caveat(checkers.declared_caveat('foo', 'c'), None, None)
return m
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
declared = checkers.infer_declared(d, first_party.checker.namespace())
self.assertEqual(declared, {
@@ -283,22 +299,22 @@ class TestDischarge(unittest.TestCase):
'arble': 'b',
})
- with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ with self.assertRaises(bakery.AuthInitError) as exc:
first_party.checker.auth([d]).allow(common.test_context,
- macaroonbakery.LOGIN_OP)
+ bakery.LOGIN_OP)
self.assertEqual('cannot authorize login macaroon: caveat '
'"declared foo a" not satisfied: got foo=null, '
'expected "a"', exc.exception.args[0])
def test_discharge_two_need_declared(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
first_party = common.new_bakery('first', locator)
third_party = common.new_bakery('third', locator)
# first_party mints a macaroon with two third party caveats
# with overlapping attributes.
m = first_party.oven.macaroon(
- macaroonbakery.LATEST_BAKERY_VERSION,
+ bakery.LATEST_VERSION,
common.ages, [
checkers.need_declared_caveat(
checkers.Caveat(location='third', condition='x'),
@@ -306,18 +322,22 @@ class TestDischarge(unittest.TestCase):
checkers.need_declared_caveat(
checkers.Caveat(location='third', condition='y'),
['bar', 'baz']),
- ], [macaroonbakery.LOGIN_OP])
+ ], [bakery.LOGIN_OP])
# The client asks for a discharge macaroon for each third party caveat.
# Since no declarations are added by the discharger,
- def get_discharge(ctx, cav, payload):
- return macaroonbakery.discharge(
- ctx, cav.caveat_id_bytes, payload, third_party.oven.key,
+ def get_discharge(cav, payload):
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
common.ThirdPartyCaveatCheckerEmpty(),
- third_party.oven.locator)
+ third_party.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
declared = checkers.infer_declared(d, first_party.checker.namespace())
self.assertEqual(declared, {
'foo': '',
@@ -325,12 +345,12 @@ class TestDischarge(unittest.TestCase):
'baz': '',
})
ctx = checkers.context_with_declared(common.test_context, declared)
- first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+ first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP])
# If they return conflicting values, the discharge fails.
# The client asks for a discharge macaroon for each third party caveat.
# Since no declarations are added by the discharger,
- class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
def check_third_party_caveat(self, ctx, cav_info):
if cav_info.condition == b'x':
return [checkers.declared_caveat('foo', 'fooval1')]
@@ -341,62 +361,70 @@ class TestDischarge(unittest.TestCase):
]
raise common.ThirdPartyCaveatCheckFailed('not matched')
- def get_discharge(ctx, cav, payload):
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- third_party.oven.key,
- ThirdPartyCaveatCheckerF(),
- third_party.oven.locator)
+ def get_discharge(cav, payload):
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
+ ThirdPartyCaveatCheckerF(),
+ third_party.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
declared = checkers.infer_declared(d, first_party.checker.namespace())
self.assertEqual(declared, {
'bar': '',
'baz': 'bazval',
})
- with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ with self.assertRaises(bakery.AuthInitError) as exc:
first_party.checker.auth([d]).allow(common.test_context,
- macaroonbakery.LOGIN_OP)
+ bakery.LOGIN_OP)
self.assertEqual('cannot authorize login macaroon: caveat "declared '
'foo fooval1" not satisfied: got foo=null, expected '
'"fooval1"', exc.exception.args[0])
def test_discharge_macaroon_cannot_be_used_as_normal_macaroon(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
first_party = common.new_bakery('first', locator)
third_party = common.new_bakery('third', locator)
# First party mints a macaroon with a 3rd party caveat.
- m = first_party.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ m = first_party.oven.macaroon(bakery.LATEST_VERSION,
common.ages, [
checkers.Caveat(location='third',
condition='true')],
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
# Acquire the discharge macaroon, but don't bind it to the original.
class M:
unbound = None
- def get_discharge(ctx, cav, payload):
- m = macaroonbakery.discharge(
- ctx, cav.caveat_id_bytes, payload, third_party.oven.key,
+ def get_discharge(cav, payload):
+ m = bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
common.ThirdPartyStrcmpChecker('true'),
- third_party.oven.locator)
+ third_party.oven.locator,
+ )
M.unbound = m.macaroon.copy()
return m
- macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ bakery.discharge_all(m, get_discharge)
self.assertIsNotNone(M.unbound)
# Make sure it cannot be used as a normal macaroon in the third party.
- with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ with self.assertRaises(bakery.VerificationError) as exc:
third_party.checker.auth([[M.unbound]]).allow(
- common.test_context, [macaroonbakery.LOGIN_OP])
+ common.test_context, [bakery.LOGIN_OP])
self.assertEqual('no operations found in macaroon',
exc.exception.args[0])
def test_third_party_discharge_macaroon_ids_are_small(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bakeries = {
'ts-loc': common.new_bakery('ts-loc', locator),
'as1-loc': common.new_bakery('as1-loc', locator),
@@ -404,14 +432,14 @@ class TestDischarge(unittest.TestCase):
}
ts = bakeries['ts-loc']
- ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION,
common.ages,
- None, [macaroonbakery.LOGIN_OP])
+ None, [bakery.LOGIN_OP])
ts_macaroon.add_caveat(checkers.Caveat(condition='something',
location='as1-loc'),
ts.oven.key, ts.oven.locator)
- class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
def __init__(self, loc):
self._loc = loc
@@ -424,18 +452,20 @@ class TestDischarge(unittest.TestCase):
raise common.ThirdPartyCaveatCheckFailed(
'unknown location {}'.format(self._loc))
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
oven = bakeries[cav.location].oven
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- oven.key,
- ThirdPartyCaveatCheckerF(
- cav.location),
- oven.locator)
-
- d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
- get_discharge)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ oven.key,
+ ThirdPartyCaveatCheckerF(cav.location),
+ oven.locator,
+ )
+
+ d = bakery.discharge_all(ts_macaroon, get_discharge)
ts.checker.auth([d]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
for i, m in enumerate(d):
for j, cav in enumerate(m.caveats):