summaryrefslogtreecommitdiff
path: root/macaroonbakery/tests/test_checker.py
diff options
context:
space:
mode:
Diffstat (limited to 'macaroonbakery/tests/test_checker.py')
-rw-r--r--macaroonbakery/tests/test_checker.py443
1 files changed, 210 insertions, 233 deletions
diff --git a/macaroonbakery/tests/test_checker.py b/macaroonbakery/tests/test_checker.py
index 06bf008..643c756 100644
--- a/macaroonbakery/tests/test_checker.py
+++ b/macaroonbakery/tests/test_checker.py
@@ -9,7 +9,7 @@ from datetime import timedelta
from pymacaroons.verifier import Verifier, FirstPartyCaveatVerifierDelegate
import pymacaroons
-import macaroonbakery
+import macaroonbakery as bakery
import macaroonbakery.checkers as checkers
from macaroonbakery.tests.common import test_context, epoch, test_checker
@@ -22,13 +22,13 @@ class TestChecker(TestCase):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
- {macaroonbakery.Op(entity='something', action='read'):
- {macaroonbakery.EVERYONE}})
+ {bakery.Op(entity='something', action='read'):
+ {bakery.EVERYONE}})
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
- auth_info = client.do(test_context, ts,
- [macaroonbakery.Op(entity='something',
- action='read')])
+ auth_info = client.do(test_context, ts, [
+ bakery.Op(entity='something', action='read'),
+ ])
self.assertEqual(len(self._discharges), 0)
self.assertIsNotNone(auth_info)
self.assertIsNone(auth_info.identity)
@@ -37,25 +37,23 @@ class TestChecker(TestCase):
def test_authorization_denied(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = macaroonbakery.ClosedAuthorizer()
+ auth = bakery.ClosedAuthorizer()
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- with self.assertRaises(macaroonbakery.PermissionDenied):
- client.do(ctx, ts, [macaroonbakery.Op(entity='something',
- action='read')])
+ with self.assertRaises(bakery.PermissionDenied):
+ client.do(ctx, ts, [bakery.Op(entity='something', action='read')])
def test_authorize_with_authentication_required(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
- {macaroonbakery.Op(entity='something', action='read'): {'bob'}})
+ {bakery.Op(entity='something', action='read'): {'bob'}})
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- auth_info = client.do(ctx, ts, [macaroonbakery.Op(entity='something',
- action='read')])
+ auth_info = client.do(ctx, ts, [bakery.Op(entity='something', action='read')])
self.assertEqual(self._discharges,
[_DischargeRecord(location='ids', user='bob')])
self.assertIsNotNone(auth_info)
@@ -67,16 +65,16 @@ class TestChecker(TestCase):
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
{
- macaroonbakery.Op(entity='something', action='read'): {'bob'},
- macaroonbakery.Op(entity='otherthing', action='read'): {'bob'}
+ bakery.Op(entity='something', action='read'): {'bob'},
+ bakery.Op(entity='otherthing', action='read'): {'bob'}
}
)
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
client.do(ctx, ts, [
- macaroonbakery.Op(entity='something', action='read'),
- macaroonbakery.Op(entity='otherthing', action='read')
+ bakery.Op(entity='something', action='read'),
+ bakery.Op(entity='otherthing', action='read')
])
self.assertEqual(self._discharges,
[_DischargeRecord(location='ids', user='bob')])
@@ -85,159 +83,150 @@ class TestChecker(TestCase):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
- {macaroonbakery.Op(entity='something', action='read'): {'bob'}})
+ {bakery.Op(entity='something', action='read'): {'bob'}})
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m = client.discharged_capability(
- ctx, ts, [macaroonbakery.Op(entity='something', action='read')])
+ ctx, ts, [bakery.Op(entity='something', action='read')])
# Check that we can exercise the capability directly on the service
# with no discharging required.
- auth_info = ts.do(test_context, [m],
- [macaroonbakery.Op(entity='something',
- action='read')])
+ auth_info = ts.do(test_context, [m], [
+ bakery.Op(entity='something', action='read'),
+ ])
self.assertIsNotNone(auth_info)
self.assertIsNone(auth_info.identity)
self.assertEqual(len(auth_info.macaroons), 1)
- self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
- m[0].identifier_bytes)
+ self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, m[0].identifier_bytes)
def test_capability_multiple_entities(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'bob'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- macaroonbakery.Op(entity='e3', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'bob'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ bakery.Op(entity='e3', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m = client.discharged_capability(ctx, ts, [
- macaroonbakery.Op(entity='e1',
- action='read'),
- macaroonbakery.Op(entity='e2',
- action='read'),
- macaroonbakery.Op(entity='e3',
- action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
self.assertEqual(self._discharges,
[_DischargeRecord(location='ids', user='bob')])
# Check that we can exercise the capability directly on the service
# with no discharging required.
ts.do(test_context, [m], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read'),
- macaroonbakery.Op(entity='e3', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
# Check that we can exercise the capability to act on a subset of
# the operations.
ts.do(test_context, [m], [
- macaroonbakery.Op(entity='e2', action='read'),
- macaroonbakery.Op(entity='e3', action='read')]
- )
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
ts.do(test_context, [m],
- [macaroonbakery.Op(entity='e3', action='read')])
+ [bakery.Op(entity='e3', action='read')])
def test_multiple_capabilities(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Acquire two capabilities as different users and check
# that we can combine them together to do both operations
# at once.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- m1 = _Client(locator).discharged_capability(ctx, ts,
- [macaroonbakery.Op(
- entity='e1',
- action='read')])
+ m1 = _Client(locator).discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ ])
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m2 = _Client(locator).discharged_capability(ctx, ts,
- [macaroonbakery.Op(
+ [bakery.Op(
entity='e2',
action='read')])
- self.assertEqual(self._discharges,
- [
- _DischargeRecord(location='ids', user='alice'),
- _DischargeRecord(location='ids', user='bob'),
- ])
+ self.assertEqual(self._discharges, [
+ _DischargeRecord(location='ids', user='alice'),
+ _DischargeRecord(location='ids', user='bob'),
+ ])
auth_info = ts.do(test_context, [m1, m2], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
self.assertIsNotNone(auth_info)
self.assertIsNone(auth_info.identity)
self.assertEqual(len(auth_info.macaroons), 2)
- self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
- m1[0].identifier_bytes)
- self.assertEqual(auth_info.macaroons[1][0].identifier_bytes,
- m2[0].identifier_bytes)
+ self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, m1[0].identifier_bytes)
+ self.assertEqual(auth_info.macaroons[1][0].identifier_bytes, m2[0].identifier_bytes)
def test_combine_capabilities(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- macaroonbakery.Op(entity='e3', action='read'): {'bob',
- 'alice'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ bakery.Op(entity='e3', action='read'): {'bob', 'alice'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Acquire two capabilities as different users and check
# that we can combine them together into a single capability
# capable of both operations.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- m1 = _Client(locator).discharged_capability(
- ctx, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e3', action='read')])
+ m1 = _Client(locator).discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m2 = _Client(locator).discharged_capability(
- ctx, ts, [macaroonbakery.Op(entity='e2', action='read')])
+ ctx, ts, [bakery.Op(entity='e2', action='read')])
m = ts.capability(test_context, [m1, m2], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read'),
- macaroonbakery.Op(entity='e3', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read'),
- macaroonbakery.Op(entity='e3', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
def test_partially_authorized_request(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Acquire a capability for e1 but rely on authentication to
# authorize e2.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- m = _Client(locator).discharged_capability(ctx, ts,
- [macaroonbakery.Op(
- entity='e1',
- action='read')])
+ m = _Client(locator).discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ ])
client = _Client(locator)
client.add_macaroon(ts, 'authz', m)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- client.discharged_capability(
- ctx, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ client.discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
def test_auth_with_third_party_caveats(self):
locator = _DischargerLocator()
@@ -247,16 +236,15 @@ class TestChecker(TestCase):
# when authorizing.
def authorize_with_tp_discharge(ctx, id, op):
if (id is not None and id.id() == 'bob' and
- op == macaroonbakery.Op(entity='something',
- action='read')):
+ op == bakery.Op(entity='something', action='read')):
return True, [checkers.Caveat(condition='question',
location='other third party')]
return False, None
- auth = macaroonbakery.AuthorizerFunc(authorize_with_tp_discharge)
+ auth = bakery.AuthorizerFunc(authorize_with_tp_discharge)
ts = _Service('myservice', auth, ids, locator)
- class _LocalDischargeChecker(macaroonbakery.ThirdPartyCaveatChecker):
+ class _LocalDischargeChecker(bakery.ThirdPartyCaveatChecker):
def check_third_party_caveat(_, ctx, info):
if info.condition != 'question':
raise ValueError('third party condition not recognized')
@@ -267,29 +255,25 @@ class TestChecker(TestCase):
return []
locator['other third party'] = _Discharger(
- key=macaroonbakery.generate_key(),
+ key=bakery.generate_key(),
checker=_LocalDischargeChecker(),
locator=locator,
)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- client.do(ctx, ts, [macaroonbakery.Op(entity='something',
- action='read')])
+ client.do(ctx, ts, [bakery.Op(entity='something', action='read')])
self.assertEqual(self._discharges, [
_DischargeRecord(location='ids', user='bob'),
- _DischargeRecord(location='other third party',
- user='bob')
+ _DischargeRecord(location='other third party', user='bob')
])
def test_capability_combines_first_party_caveats(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'}
- }
- )
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Acquire two capabilities as different users, add some first party
@@ -297,12 +281,12 @@ class TestChecker(TestCase):
# capable of both operations.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
m1 = _Client(locator).capability(
- ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ ctx, ts, [bakery.Op(entity='e1', action='read')])
m1.macaroon.add_first_party_caveat('true 1')
m1.macaroon.add_first_party_caveat('true 2')
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m2 = _Client(locator).capability(
- ctx, ts, [macaroonbakery.Op(entity='e2', action='read')])
+ ctx, ts, [bakery.Op(entity='e2', action='read')])
m2.macaroon.add_first_party_caveat('true 3')
m2.macaroon.add_first_party_caveat('true 4')
@@ -311,8 +295,9 @@ class TestChecker(TestCase):
client.add_macaroon(ts, 'authz2', [m2.macaroon])
m = client.capability(test_context, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [
'true 1',
'true 2',
@@ -323,11 +308,10 @@ class TestChecker(TestCase):
def test_first_party_caveat_squashing(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'alice'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'alice'},
+ })
ts = _Service('myservice', auth, ids, locator)
tests = [
('duplicates removed', [
@@ -366,13 +350,13 @@ class TestChecker(TestCase):
# Make a first macaroon with all the required first party caveats.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
m1 = _Client(locator).capability(
- ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ ctx, ts, [bakery.Op(entity='e1', action='read')])
m1.add_caveats(test[1], None, None)
# Make a second macaroon that's not used to check that it's
# caveats are not added.
m2 = _Client(locator).capability(
- ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ ctx, ts, [bakery.Op(entity='e1', action='read')])
m2.add_caveat(checkers.Caveat(
condition='true notused', namespace='testns'), None, None)
client = _Client(locator)
@@ -380,8 +364,7 @@ class TestChecker(TestCase):
client.add_macaroon(ts, 'authz2', [m2.macaroon])
m3 = client.capability(
- test_context, ts, [macaroonbakery.Op(entity='e1',
- action='read')])
+ test_context, ts, [bakery.Op(entity='e1', action='read')])
self.assertEqual(
_macaroon_conditions(m3.macaroon.caveats, False),
_resolve_caveats(m3.namespace, test[2]))
@@ -389,11 +372,11 @@ class TestChecker(TestCase):
def test_login_only(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = macaroonbakery.ClosedAuthorizer()
+ auth = bakery.ClosedAuthorizer()
ts = _Service('myservice', auth, ids, locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP])
self.assertIsNotNone(auth_info)
self.assertEqual(auth_info.identity.id(), 'bob')
@@ -402,18 +385,17 @@ class TestChecker(TestCase):
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
{
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
})
ts = _Service('myservice', auth, ids, locator)
# Acquire a capability for e1 but rely on authentication to
# authorize e2.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- m = _Client(locator).discharged_capability(ctx, ts,
- [macaroonbakery.Op(
- entity='e1',
- action='read')])
+ m = _Client(locator).discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ ])
client = _Client(locator)
client.add_macaroon(ts, 'authz', m)
@@ -423,24 +405,22 @@ class TestChecker(TestCase):
with self.assertRaises(_DischargeRequiredError):
client.do_any(
ctx, ts, [
- macaroonbakery.LOGIN_OP,
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e1', action='read')
+ bakery.LOGIN_OP,
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e1', action='read')
]
)
self.assertEqual(len(self._discharges), 0)
# Log in as bob.
- _, err = client.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ _, err = client.do(ctx, ts, [bakery.LOGIN_OP])
# All the previous actions should now be allowed.
- auth_info, allowed = client.do_any(
- ctx, ts, [
- macaroonbakery.LOGIN_OP,
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e1', action='read')
- ]
- )
+ auth_info, allowed = client.do_any(ctx, ts, [
+ bakery.LOGIN_OP,
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e1', action='read'),
+ ])
self.assertEqual(auth_info.identity.id(), 'bob')
self.assertEqual(len(auth_info.macaroons), 2)
self.assertEqual(allowed, [True, True, True])
@@ -448,123 +428,121 @@ class TestChecker(TestCase):
def test_auth_with_identity_from_context(self):
locator = _DischargerLocator()
ids = _BasicAuthIdService()
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'sherlock'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'sherlock'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Check that we can perform the ops with basic auth in the
# context.
ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
auth_info = _Client(locator).do(
- ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ ctx, ts, [bakery.Op(entity='e1', action='read')])
self.assertEqual(auth_info.identity.id(), 'sherlock')
self.assertEqual(len(auth_info.macaroons), 0)
def test_auth_login_op_with_identity_from_context(self):
locator = _DischargerLocator()
ids = _BasicAuthIdService()
- ts = _Service('myservice', macaroonbakery.ClosedAuthorizer(),
- ids, locator)
+ ts = _Service('myservice', bakery.ClosedAuthorizer(), ids, locator)
# Check that we can use LoginOp
# when auth isn't granted through macaroons.
ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
- auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'sherlock')
self.assertEqual(len(auth_info.macaroons), 0)
def test_operation_allow_caveat(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'bob'},
- macaroonbakery.Op(entity='e1', action='write'): {'bob'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'bob'},
+ bakery.Op(entity='e1', action='write'): {'bob'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- m = client.capability(
- ctx, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e1', action='write'),
- macaroonbakery.Op(entity='e2', action='read')])
+ m = client.capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e1', action='write'),
+ bakery.Op(entity='e2', action='read'),
+ ])
# Sanity check that we can do a write.
ts.do(test_context, [[m.macaroon]],
- [macaroonbakery.Op(entity='e1', action='write')])
+ [bakery.Op(entity='e1', action='write')])
m.add_caveat(checkers.allow_caveat(['read']), None, None)
# A read operation should work.
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
# A write operation should fail
# even though the original macaroon allowed it.
with self.assertRaises(_DischargeRequiredError):
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='write')])
+ bakery.Op(entity='e1', action='write'),
+ ])
def test_operation_deny_caveat(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'bob'},
- macaroonbakery.Op(entity='e1', action='write'): {'bob'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'bob'},
+ bakery.Op(entity='e1', action='write'): {'bob'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- m = client.capability(
- ctx, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e1', action='write'),
- macaroonbakery.Op(entity='e2', action='read')])
+ m = client.capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e1', action='write'),
+ bakery.Op(entity='e2', action='read'),
+ ])
# Sanity check that we can do a write.
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='write')])
+ bakery.Op(entity='e1', action='write')])
m.add_caveat(checkers.deny_caveat(['write']), None, None)
# A read operation should work.
- ts.do(
- test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ ts.do(test_context, [[m.macaroon]], [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
# A write operation should fail
# even though the original macaroon allowed it.
with self.assertRaises(_DischargeRequiredError):
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='write')])
+ bakery.Op(entity='e1', action='write')])
def test_duplicate_login_macaroons(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = macaroonbakery.ClosedAuthorizer()
+ auth = bakery.ClosedAuthorizer()
ts = _Service('myservice', auth, ids, locator)
# Acquire a login macaroon for bob.
client1 = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- auth_info = client1.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = client1.do(ctx, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'bob')
# Acquire a login macaroon for alice.
client2 = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- auth_info = client2.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = client2.do(ctx, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'alice')
# Combine the two login macaroons into one client.
@@ -576,32 +554,30 @@ class TestChecker(TestCase):
# We should authenticate as bob (because macaroons are presented
# ordered by "cookie" name)
- auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'bob')
self.assertEqual(len(auth_info.macaroons), 1)
# Try them the other way around and we should authenticate as alice.
client3 = _Client(locator)
- client3.add_macaroon(ts, '1.alice',
- client2._macaroons[ts.name()]['authn'])
- client3.add_macaroon(ts, '2.bob',
- client1._macaroons[ts.name()]['authn'])
+ client3.add_macaroon(ts, '1.alice', client2._macaroons[ts.name()]['authn'])
+ client3.add_macaroon(ts, '2.bob', client1._macaroons[ts.name()]['authn'])
- auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'alice')
self.assertEqual(len(auth_info.macaroons), 1)
def test_macaroon_ops_fatal_error(self):
# When we get a non-VerificationError error from the
# opstore, we don't do any more verification.
- checker = macaroonbakery.Checker(
+ checker = bakery.Checker(
macaroon_opstore=_MacaroonStoreWithError())
m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2)
- with self.assertRaises(ValueError):
- checker.auth([m]).allow(test_context, [macaroonbakery.LOGIN_OP])
+ with self.assertRaises(bakery.AuthInitError):
+ checker.auth([m]).allow(test_context, [bakery.LOGIN_OP])
-class _DischargerLocator(object):
+class _DischargerLocator(bakery.ThirdPartyLocator):
def __init__(self, dischargers=None):
if dischargers is None:
dischargers = {}
@@ -611,9 +587,9 @@ class _DischargerLocator(object):
d = self._dischargers.get(loc)
if d is None:
return None
- return macaroonbakery.ThirdPartyInfo(
+ return bakery.ThirdPartyInfo(
public_key=d._key.public_key,
- version=macaroonbakery.LATEST_BAKERY_VERSION,
+ version=bakery.LATEST_VERSION,
)
def __setitem__(self, key, item):
@@ -626,25 +602,23 @@ class _DischargerLocator(object):
return self._dischargers.get(key)
-class _IdService(macaroonbakery.IdentityClient,
- macaroonbakery.ThirdPartyCaveatChecker):
+class _IdService(bakery.IdentityClient,
+ bakery.ThirdPartyCaveatChecker):
def __init__(self, location, locator, test_class):
self._location = location
self._test = test_class
- key = macaroonbakery.generate_key()
+ key = bakery.generate_key()
self._discharger = _Discharger(key=key, checker=self, locator=locator)
locator[location] = self._discharger
def check_third_party_caveat(self, ctx, info):
if info.condition != 'is-authenticated-user':
- raise macaroonbakery.CaveatNotRecognizedError(
- 'third party condition not '
- 'recognized')
+ raise bakery.CaveatNotRecognizedError(
+ 'third party condition not recognized')
username = ctx.get(_DISCHARGE_USER_KEY, '')
if username == '':
- return macaroonbakery.ThirdPartyCaveatCheckFailed(
- 'no current user')
+ raise bakery.ThirdPartyCaveatCheckFailed('no current user')
self._test._discharges.append(
_DischargeRecord(location=self._location, user=username))
return [checkers.declared_caveat('username', username)]
@@ -656,8 +630,8 @@ class _IdService(macaroonbakery.IdentityClient,
def declared_identity(self, ctx, declared):
user = declared.get('username')
if user is None:
- raise macaroonbakery.IdentityError('no username declared')
- return macaroonbakery.SimpleIdentity(user)
+ raise bakery.IdentityError('no username declared')
+ return bakery.SimpleIdentity(user)
_DISCHARGE_USER_KEY = checkers.ContextKey('user-key')
@@ -676,13 +650,17 @@ class _Discharger(object):
self._checker = checker
def discharge(self, ctx, cav, payload):
- return macaroonbakery.discharge(ctx, key=self._key, id=cav.caveat_id,
- caveat=payload,
- checker=self._checker,
- locator=self._locator)
+ return bakery.discharge(
+ ctx,
+ key=self._key,
+ id=cav.caveat_id,
+ caveat=payload,
+ checker=self._checker,
+ locator=self._locator,
+ )
-class _OpAuthorizer(macaroonbakery.Authorizer):
+class _OpAuthorizer(bakery.Authorizer):
'''Implements bakery.Authorizer by looking the operation
up in the given map. If the username is in the associated list
or the list contains "everyone", authorization is granted.
@@ -694,7 +672,7 @@ class _OpAuthorizer(macaroonbakery.Authorizer):
self._auth = auth
def authorize(self, ctx, id, ops):
- return macaroonbakery.ACLAuthorizer(
+ return bakery.ACLAuthorizer(
allow_public=True,
get_acl=lambda ctx, op: self._auth.get(op, [])).authorize(
ctx, id, ops)
@@ -705,7 +683,7 @@ class _MacaroonStore(object):
'''
def __init__(self, key, locator):
- self._root_key_store = macaroonbakery.MemoryKeyStore()
+ self._root_key_store = bakery.MemoryKeyStore()
self._key = key
self._locator = locator
@@ -713,9 +691,9 @@ class _MacaroonStore(object):
root_key, id = self._root_key_store.root_key()
m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops}
data = json.dumps(m_id)
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key=root_key, id=data, location='',
- version=macaroonbakery.LATEST_BAKERY_VERSION,
+ version=bakery.LATEST_VERSION,
namespace=namespace)
m.add_caveats(caveats, self._key, self._locator)
return m
@@ -739,7 +717,7 @@ class _MacaroonStore(object):
ok = v.verify(macaroon=ms[0], key=root_key,
discharge_macaroons=ms[1:])
if not ok:
- raise macaroonbakery.VerificationError('invalid signature')
+ raise bakery.VerificationError('invalid signature')
conditions = []
for m in ms:
cavs = m.first_party_caveats()
@@ -747,7 +725,7 @@ class _MacaroonStore(object):
conditions.append(cav.caveat_id_bytes.decode('utf-8'))
ops = []
for op in m_id['ops']:
- ops.append(macaroonbakery.Op(entity=op[0], action=op[1]))
+ ops.append(bakery.Op(entity=op[0], action=op[1]))
return ops, conditions
@@ -761,8 +739,8 @@ class _Service(object):
def __init__(self, name, auth, idm, locator):
self._name = name
- self._store = _MacaroonStore(macaroonbakery.generate_key(), locator)
- self._checker = macaroonbakery.Checker(
+ self._store = _MacaroonStore(bakery.generate_key(), locator)
+ self._checker = bakery.Checker(
checker=test_checker(),
authorizer=auth,
identity_client=idm,
@@ -774,7 +752,7 @@ class _Service(object):
def do(self, ctx, ms, ops):
try:
authInfo = self._checker.auth(ms).allow(ctx, ops)
- except macaroonbakery.DischargeRequiredError as exc:
+ except bakery.DischargeRequiredError as exc:
self._discharge_required_error(exc)
return authInfo
@@ -784,13 +762,13 @@ class _Service(object):
try:
authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops)
return authInfo, allowed
- except macaroonbakery.DischargeRequiredError as exc:
+ except bakery.DischargeRequiredError as exc:
self._discharge_required_error(exc)
def capability(self, ctx, ms, ops):
try:
conds = self._checker.auth(ms).allow_capability(ctx, ops)
- except macaroonbakery.DischargeRequiredError as exc:
+ except bakery.DischargeRequiredError as exc:
self._discharge_required_error(exc)
m = self._store.new_macaroon(None, self._checker.namespace(), ops)
@@ -802,7 +780,7 @@ class _Service(object):
m = self._store.new_macaroon(err.cavs(), self._checker.namespace(),
err.ops())
name = 'authz'
- if len(err.ops()) == 1 and err.ops()[0] == macaroonbakery.LOGIN_OP:
+ if len(err.ops()) == 1 and err.ops()[0] == bakery.LOGIN_OP:
name = 'authn'
raise _DischargeRequiredError(name=name, m=m)
@@ -824,7 +802,7 @@ class _Client(object):
max_retries = 3
def __init__(self, dischargers):
- self._key = macaroonbakery.generate_key()
+ self._key = bakery.generate_key()
self._macaroons = {}
self._dischargers = dischargers
@@ -894,26 +872,25 @@ class _Client(object):
return ms
def _discharge_all(self, ctx, m):
- def get_discharge(ctx, cav, pay_load):
+ def get_discharge(cav, payload):
d = self._dischargers.get(cav.location)
if d is None:
raise ValueError('third party discharger '
'{} not found'.format(cav.location))
- return d.discharge(ctx, cav, pay_load)
+ return d.discharge(ctx, cav, payload)
- return macaroonbakery.discharge_all(ctx, m, get_discharge)
+ return bakery.discharge_all(m, get_discharge)
-class _BasicAuthIdService(macaroonbakery.IdentityClient):
+class _BasicAuthIdService(bakery.IdentityClient):
def identity_from_context(self, ctx):
user, pwd = _basic_auth_from_context(ctx)
if user != 'sherlock' or pwd != 'holmes':
return None, None
- return macaroonbakery.SimpleIdentity(user), None
+ return bakery.SimpleIdentity(user), None
def declared_identity(self, ctx, declared):
- raise macaroonbakery.IdentityError('no identity declarations in basic '
- 'auth id service')
+ raise bakery.IdentityError('no identity declarations in basic auth id service')
_BASIC_AUTH_KEY = checkers.ContextKey('user-key')