summaryrefslogtreecommitdiff
path: root/macaroonbakery/tests/test_checker.py
diff options
context:
space:
mode:
Diffstat (limited to 'macaroonbakery/tests/test_checker.py')
-rw-r--r--macaroonbakery/tests/test_checker.py963
1 files changed, 963 insertions, 0 deletions
diff --git a/macaroonbakery/tests/test_checker.py b/macaroonbakery/tests/test_checker.py
new file mode 100644
index 0000000..06bf008
--- /dev/null
+++ b/macaroonbakery/tests/test_checker.py
@@ -0,0 +1,963 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+import base64
+from collections import namedtuple
+import json
+from unittest import TestCase
+from datetime import timedelta
+
+from pymacaroons.verifier import Verifier, FirstPartyCaveatVerifierDelegate
+import pymacaroons
+
+import macaroonbakery
+import macaroonbakery.checkers as checkers
+from macaroonbakery.tests.common import test_context, epoch, test_checker
+
+
+class TestChecker(TestCase):
+ def setUp(self):
+ self._discharges = []
+
+ def test_authorize_with_open_access_and_no_macaroons(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {macaroonbakery.Op(entity='something', action='read'):
+ {macaroonbakery.EVERYONE}})
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+ auth_info = client.do(test_context, ts,
+ [macaroonbakery.Op(entity='something',
+ action='read')])
+ self.assertEqual(len(self._discharges), 0)
+ self.assertIsNotNone(auth_info)
+ self.assertIsNone(auth_info.identity)
+ self.assertEqual(len(auth_info.macaroons), 0)
+
+ def test_authorization_denied(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = macaroonbakery.ClosedAuthorizer()
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ with self.assertRaises(macaroonbakery.PermissionDenied):
+ client.do(ctx, ts, [macaroonbakery.Op(entity='something',
+ action='read')])
+
+ def test_authorize_with_authentication_required(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {macaroonbakery.Op(entity='something', action='read'): {'bob'}})
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ auth_info = client.do(ctx, ts, [macaroonbakery.Op(entity='something',
+ action='read')])
+ self.assertEqual(self._discharges,
+ [_DischargeRecord(location='ids', user='bob')])
+ self.assertIsNotNone(auth_info)
+ self.assertEqual(auth_info.identity.id(), 'bob')
+ self.assertEqual(len(auth_info.macaroons), 1)
+
+ def test_authorize_multiple_ops(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='something', action='read'): {'bob'},
+ macaroonbakery.Op(entity='otherthing', action='read'): {'bob'}
+ }
+ )
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ client.do(ctx, ts, [
+ macaroonbakery.Op(entity='something', action='read'),
+ macaroonbakery.Op(entity='otherthing', action='read')
+ ])
+ self.assertEqual(self._discharges,
+ [_DischargeRecord(location='ids', user='bob')])
+
+ def test_capability(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {macaroonbakery.Op(entity='something', action='read'): {'bob'}})
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m = client.discharged_capability(
+ ctx, ts, [macaroonbakery.Op(entity='something', action='read')])
+ # Check that we can exercise the capability directly on the service
+ # with no discharging required.
+ auth_info = ts.do(test_context, [m],
+ [macaroonbakery.Op(entity='something',
+ action='read')])
+ self.assertIsNotNone(auth_info)
+ self.assertIsNone(auth_info.identity)
+ self.assertEqual(len(auth_info.macaroons), 1)
+ self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
+ m[0].identifier_bytes)
+
+ def test_capability_multiple_entities(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e3', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m = client.discharged_capability(ctx, ts, [
+ macaroonbakery.Op(entity='e1',
+ action='read'),
+ macaroonbakery.Op(entity='e2',
+ action='read'),
+ macaroonbakery.Op(entity='e3',
+ action='read')])
+ self.assertEqual(self._discharges,
+ [_DischargeRecord(location='ids', user='bob')])
+
+ # Check that we can exercise the capability directly on the service
+ # with no discharging required.
+ ts.do(test_context, [m], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')])
+
+ # Check that we can exercise the capability to act on a subset of
+ # the operations.
+ ts.do(test_context, [m], [
+ macaroonbakery.Op(entity='e2', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')]
+ )
+ ts.do(test_context, [m],
+ [macaroonbakery.Op(entity='e3', action='read')])
+
+ def test_multiple_capabilities(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire two capabilities as different users and check
+ # that we can combine them together to do both operations
+ # at once.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m1 = _Client(locator).discharged_capability(ctx, ts,
+ [macaroonbakery.Op(
+ entity='e1',
+ action='read')])
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m2 = _Client(locator).discharged_capability(ctx, ts,
+ [macaroonbakery.Op(
+ entity='e2',
+ action='read')])
+ self.assertEqual(self._discharges,
+ [
+ _DischargeRecord(location='ids', user='alice'),
+ _DischargeRecord(location='ids', user='bob'),
+ ])
+ auth_info = ts.do(test_context, [m1, m2], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+ self.assertIsNotNone(auth_info)
+ self.assertIsNone(auth_info.identity)
+ self.assertEqual(len(auth_info.macaroons), 2)
+ self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
+ m1[0].identifier_bytes)
+ self.assertEqual(auth_info.macaroons[1][0].identifier_bytes,
+ m2[0].identifier_bytes)
+
+ def test_combine_capabilities(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e3', action='read'): {'bob',
+ 'alice'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire two capabilities as different users and check
+ # that we can combine them together into a single capability
+ # capable of both operations.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m1 = _Client(locator).discharged_capability(
+ ctx, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')])
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m2 = _Client(locator).discharged_capability(
+ ctx, ts, [macaroonbakery.Op(entity='e2', action='read')])
+
+ m = ts.capability(test_context, [m1, m2], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')])
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')])
+
+ def test_partially_authorized_request(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire a capability for e1 but rely on authentication to
+ # authorize e2.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m = _Client(locator).discharged_capability(ctx, ts,
+ [macaroonbakery.Op(
+ entity='e1',
+ action='read')])
+ client = _Client(locator)
+ client.add_macaroon(ts, 'authz', m)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ client.discharged_capability(
+ ctx, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ def test_auth_with_third_party_caveats(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+
+ # We make an authorizer that requires a third party discharge
+ # when authorizing.
+ def authorize_with_tp_discharge(ctx, id, op):
+ if (id is not None and id.id() == 'bob' and
+ op == macaroonbakery.Op(entity='something',
+ action='read')):
+ return True, [checkers.Caveat(condition='question',
+ location='other third party')]
+ return False, None
+
+ auth = macaroonbakery.AuthorizerFunc(authorize_with_tp_discharge)
+ ts = _Service('myservice', auth, ids, locator)
+
+ class _LocalDischargeChecker(macaroonbakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(_, ctx, info):
+ if info.condition != 'question':
+ raise ValueError('third party condition not recognized')
+ self._discharges.append(_DischargeRecord(
+ location='other third party',
+ user=ctx.get(_DISCHARGE_USER_KEY)
+ ))
+ return []
+
+ locator['other third party'] = _Discharger(
+ key=macaroonbakery.generate_key(),
+ checker=_LocalDischargeChecker(),
+ locator=locator,
+ )
+ client = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ client.do(ctx, ts, [macaroonbakery.Op(entity='something',
+ action='read')])
+ self.assertEqual(self._discharges, [
+ _DischargeRecord(location='ids', user='bob'),
+ _DischargeRecord(location='other third party',
+ user='bob')
+ ])
+
+ def test_capability_combines_first_party_caveats(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'}
+ }
+ )
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire two capabilities as different users, add some first party
+ # caveats that we can combine them together into a single capability
+ # capable of both operations.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m1 = _Client(locator).capability(
+ ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ m1.macaroon.add_first_party_caveat('true 1')
+ m1.macaroon.add_first_party_caveat('true 2')
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m2 = _Client(locator).capability(
+ ctx, ts, [macaroonbakery.Op(entity='e2', action='read')])
+ m2.macaroon.add_first_party_caveat('true 3')
+ m2.macaroon.add_first_party_caveat('true 4')
+
+ client = _Client(locator)
+ client.add_macaroon(ts, 'authz1', [m1.macaroon])
+ client.add_macaroon(ts, 'authz2', [m2.macaroon])
+
+ m = client.capability(test_context, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+ self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [
+ 'true 1',
+ 'true 2',
+ 'true 3',
+ 'true 4',
+ ])
+
+ def test_first_party_caveat_squashing(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'alice'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+ tests = [
+ ('duplicates removed', [
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.Caveat(condition='true 2', namespace='testns'),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.Caveat(condition='true 3', namespace='testns'),
+ ], [
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.Caveat(condition='true 2', namespace='testns'),
+ checkers.Caveat(condition='true 3', namespace='testns'),
+ ]), ('earliest time before', [
+ checkers.time_before_caveat(epoch + timedelta(days=1)),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.time_before_caveat(
+ epoch + timedelta(days=0, hours=1)),
+ checkers.time_before_caveat(epoch + timedelta(
+ days=0, hours=0, minutes=5)),
+ ], [
+ checkers.time_before_caveat(epoch + timedelta(
+ days=0, hours=0, minutes=5)),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ ]), ('operations and declared caveats removed', [
+ checkers.deny_caveat(['foo']),
+ checkers.allow_caveat(['read', 'write']),
+ checkers.declared_caveat('username', 'bob'),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ ], [
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ ])
+ ]
+ for test in tests:
+ print(test[0])
+
+ # Make a first macaroon with all the required first party caveats.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m1 = _Client(locator).capability(
+ ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ m1.add_caveats(test[1], None, None)
+
+ # Make a second macaroon that's not used to check that it's
+ # caveats are not added.
+ m2 = _Client(locator).capability(
+ ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ m2.add_caveat(checkers.Caveat(
+ condition='true notused', namespace='testns'), None, None)
+ client = _Client(locator)
+ client.add_macaroon(ts, 'authz1', [m1.macaroon])
+ client.add_macaroon(ts, 'authz2', [m2.macaroon])
+
+ m3 = client.capability(
+ test_context, ts, [macaroonbakery.Op(entity='e1',
+ action='read')])
+ self.assertEqual(
+ _macaroon_conditions(m3.macaroon.caveats, False),
+ _resolve_caveats(m3.namespace, test[2]))
+
+ def test_login_only(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = macaroonbakery.ClosedAuthorizer()
+ ts = _Service('myservice', auth, ids, locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ self.assertIsNotNone(auth_info)
+ self.assertEqual(auth_info.identity.id(), 'bob')
+
+ def test_allow_any(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire a capability for e1 but rely on authentication to
+ # authorize e2.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m = _Client(locator).discharged_capability(ctx, ts,
+ [macaroonbakery.Op(
+ entity='e1',
+ action='read')])
+
+ client = _Client(locator)
+ client.add_macaroon(ts, 'authz', m)
+
+ self._discharges = []
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ with self.assertRaises(_DischargeRequiredError):
+ client.do_any(
+ ctx, ts, [
+ macaroonbakery.LOGIN_OP,
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e1', action='read')
+ ]
+ )
+ self.assertEqual(len(self._discharges), 0)
+
+ # Log in as bob.
+ _, err = client.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+
+ # All the previous actions should now be allowed.
+ auth_info, allowed = client.do_any(
+ ctx, ts, [
+ macaroonbakery.LOGIN_OP,
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e1', action='read')
+ ]
+ )
+ self.assertEqual(auth_info.identity.id(), 'bob')
+ self.assertEqual(len(auth_info.macaroons), 2)
+ self.assertEqual(allowed, [True, True, True])
+
+ def test_auth_with_identity_from_context(self):
+ locator = _DischargerLocator()
+ ids = _BasicAuthIdService()
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'sherlock'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Check that we can perform the ops with basic auth in the
+ # context.
+ ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
+ auth_info = _Client(locator).do(
+ ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ self.assertEqual(auth_info.identity.id(), 'sherlock')
+ self.assertEqual(len(auth_info.macaroons), 0)
+
+ def test_auth_login_op_with_identity_from_context(self):
+ locator = _DischargerLocator()
+ ids = _BasicAuthIdService()
+ ts = _Service('myservice', macaroonbakery.ClosedAuthorizer(),
+ ids, locator)
+
+ # Check that we can use LoginOp
+ # when auth isn't granted through macaroons.
+ ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
+ auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'sherlock')
+ self.assertEqual(len(auth_info.macaroons), 0)
+
+ def test_operation_allow_caveat(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e1', action='write'): {'bob'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m = client.capability(
+ ctx, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e1', action='write'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ # Sanity check that we can do a write.
+ ts.do(test_context, [[m.macaroon]],
+ [macaroonbakery.Op(entity='e1', action='write')])
+
+ m.add_caveat(checkers.allow_caveat(['read']), None, None)
+
+ # A read operation should work.
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ # A write operation should fail
+ # even though the original macaroon allowed it.
+ with self.assertRaises(_DischargeRequiredError):
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='write')])
+
+ def test_operation_deny_caveat(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e1', action='write'): {'bob'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m = client.capability(
+ ctx, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e1', action='write'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ # Sanity check that we can do a write.
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='write')])
+
+ m.add_caveat(checkers.deny_caveat(['write']), None, None)
+
+ # A read operation should work.
+ ts.do(
+ test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ # A write operation should fail
+ # even though the original macaroon allowed it.
+ with self.assertRaises(_DischargeRequiredError):
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='write')])
+
+ def test_duplicate_login_macaroons(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = macaroonbakery.ClosedAuthorizer()
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire a login macaroon for bob.
+ client1 = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ auth_info = client1.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'bob')
+
+ # Acquire a login macaroon for alice.
+ client2 = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ auth_info = client2.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'alice')
+
+ # Combine the two login macaroons into one client.
+ client3 = _Client(locator)
+ client3.add_macaroon(ts, '1.bob',
+ client1._macaroons[ts.name()]['authn'])
+ client3.add_macaroon(ts, '2.alice',
+ client2._macaroons[ts.name()]['authn'])
+
+ # We should authenticate as bob (because macaroons are presented
+ # ordered by "cookie" name)
+ auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'bob')
+ self.assertEqual(len(auth_info.macaroons), 1)
+
+ # Try them the other way around and we should authenticate as alice.
+ client3 = _Client(locator)
+ client3.add_macaroon(ts, '1.alice',
+ client2._macaroons[ts.name()]['authn'])
+ client3.add_macaroon(ts, '2.bob',
+ client1._macaroons[ts.name()]['authn'])
+
+ auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'alice')
+ self.assertEqual(len(auth_info.macaroons), 1)
+
+ def test_macaroon_ops_fatal_error(self):
+ # When we get a non-VerificationError error from the
+ # opstore, we don't do any more verification.
+ checker = macaroonbakery.Checker(
+ macaroon_opstore=_MacaroonStoreWithError())
+ m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2)
+ with self.assertRaises(ValueError):
+ checker.auth([m]).allow(test_context, [macaroonbakery.LOGIN_OP])
+
+
+class _DischargerLocator(object):
+ def __init__(self, dischargers=None):
+ if dischargers is None:
+ dischargers = {}
+ self._dischargers = dischargers
+
+ def third_party_info(self, loc):
+ d = self._dischargers.get(loc)
+ if d is None:
+ return None
+ return macaroonbakery.ThirdPartyInfo(
+ public_key=d._key.public_key,
+ version=macaroonbakery.LATEST_BAKERY_VERSION,
+ )
+
+ def __setitem__(self, key, item):
+ self._dischargers[key] = item
+
+ def __getitem__(self, key):
+ return self._dischargers[key]
+
+ def get(self, key):
+ return self._dischargers.get(key)
+
+
+class _IdService(macaroonbakery.IdentityClient,
+ macaroonbakery.ThirdPartyCaveatChecker):
+ def __init__(self, location, locator, test_class):
+ self._location = location
+ self._test = test_class
+ key = macaroonbakery.generate_key()
+ self._discharger = _Discharger(key=key, checker=self, locator=locator)
+ locator[location] = self._discharger
+
+ def check_third_party_caveat(self, ctx, info):
+ if info.condition != 'is-authenticated-user':
+ raise macaroonbakery.CaveatNotRecognizedError(
+ 'third party condition not '
+ 'recognized')
+
+ username = ctx.get(_DISCHARGE_USER_KEY, '')
+ if username == '':
+ return macaroonbakery.ThirdPartyCaveatCheckFailed(
+ 'no current user')
+ self._test._discharges.append(
+ _DischargeRecord(location=self._location, user=username))
+ return [checkers.declared_caveat('username', username)]
+
+ def identity_from_context(self, ctx):
+ return None, [checkers.Caveat(location=self._location,
+ condition='is-authenticated-user')]
+
+ def declared_identity(self, ctx, declared):
+ user = declared.get('username')
+ if user is None:
+ raise macaroonbakery.IdentityError('no username declared')
+ return macaroonbakery.SimpleIdentity(user)
+
+
+_DISCHARGE_USER_KEY = checkers.ContextKey('user-key')
+
+_DischargeRecord = namedtuple('_DISCHARGE_RECORD', ['location', 'user'])
+
+
+class _Discharger(object):
+ ''' utility class that has a discharge function with the same signature of
+ get_discharge for discharge_all.
+ '''
+
+ def __init__(self, key, locator, checker):
+ self._key = key
+ self._locator = locator
+ self._checker = checker
+
+ def discharge(self, ctx, cav, payload):
+ return macaroonbakery.discharge(ctx, key=self._key, id=cav.caveat_id,
+ caveat=payload,
+ checker=self._checker,
+ locator=self._locator)
+
+
+class _OpAuthorizer(macaroonbakery.Authorizer):
+ '''Implements bakery.Authorizer by looking the operation
+ up in the given map. If the username is in the associated list
+ or the list contains "everyone", authorization is granted.
+ '''
+
+ def __init__(self, auth=None):
+ if auth is None:
+ auth = {}
+ self._auth = auth
+
+ def authorize(self, ctx, id, ops):
+ return macaroonbakery.ACLAuthorizer(
+ allow_public=True,
+ get_acl=lambda ctx, op: self._auth.get(op, [])).authorize(
+ ctx, id, ops)
+
+
+class _MacaroonStore(object):
+ ''' Stores root keys in memory and puts all operations in the macaroon id.
+ '''
+
+ def __init__(self, key, locator):
+ self._root_key_store = macaroonbakery.MemoryKeyStore()
+ self._key = key
+ self._locator = locator
+
+ def new_macaroon(self, caveats, namespace, ops):
+ root_key, id = self._root_key_store.root_key()
+ m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops}
+ data = json.dumps(m_id)
+ m = macaroonbakery.Macaroon(
+ root_key=root_key, id=data, location='',
+ version=macaroonbakery.LATEST_BAKERY_VERSION,
+ namespace=namespace)
+ m.add_caveats(caveats, self._key, self._locator)
+ return m
+
+ def macaroon_ops(self, ms):
+ if len(ms) == 0:
+ raise ValueError('no macaroons provided')
+
+ m_id = json.loads(ms[0].identifier_bytes.decode('utf-8'))
+ root_key = self._root_key_store.get(
+ base64.urlsafe_b64decode(m_id['id'].encode('utf-8')))
+
+ v = Verifier()
+
+ class NoValidationOnFirstPartyCaveat(FirstPartyCaveatVerifierDelegate):
+ def verify_first_party_caveat(self, verifier, caveat, signature):
+ return True
+
+ v.first_party_caveat_verifier_delegate = \
+ NoValidationOnFirstPartyCaveat()
+ ok = v.verify(macaroon=ms[0], key=root_key,
+ discharge_macaroons=ms[1:])
+ if not ok:
+ raise macaroonbakery.VerificationError('invalid signature')
+ conditions = []
+ for m in ms:
+ cavs = m.first_party_caveats()
+ for cav in cavs:
+ conditions.append(cav.caveat_id_bytes.decode('utf-8'))
+ ops = []
+ for op in m_id['ops']:
+ ops.append(macaroonbakery.Op(entity=op[0], action=op[1]))
+ return ops, conditions
+
+
+class _Service(object):
+ '''Represents a service that requires authorization.
+
+ Clients can make requests to the service to perform operations
+ and may receive a macaroon to discharge if the authorization
+ process requires it.
+ '''
+
+ def __init__(self, name, auth, idm, locator):
+ self._name = name
+ self._store = _MacaroonStore(macaroonbakery.generate_key(), locator)
+ self._checker = macaroonbakery.Checker(
+ checker=test_checker(),
+ authorizer=auth,
+ identity_client=idm,
+ macaroon_opstore=self._store)
+
+ def name(self):
+ return self._name
+
+ def do(self, ctx, ms, ops):
+ try:
+ authInfo = self._checker.auth(ms).allow(ctx, ops)
+ except macaroonbakery.DischargeRequiredError as exc:
+ self._discharge_required_error(exc)
+ return authInfo
+
+ def do_any(self, ctx, ms, ops):
+ # makes a request to the service to perform any of the given
+ # operations. It reports which operations have succeeded.
+ try:
+ authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops)
+ return authInfo, allowed
+ except macaroonbakery.DischargeRequiredError as exc:
+ self._discharge_required_error(exc)
+
+ def capability(self, ctx, ms, ops):
+ try:
+ conds = self._checker.auth(ms).allow_capability(ctx, ops)
+ except macaroonbakery.DischargeRequiredError as exc:
+ self._discharge_required_error(exc)
+
+ m = self._store.new_macaroon(None, self._checker.namespace(), ops)
+ for cond in conds:
+ m.macaroon.add_first_party_caveat(cond)
+ return m
+
+ def _discharge_required_error(self, err):
+ m = self._store.new_macaroon(err.cavs(), self._checker.namespace(),
+ err.ops())
+ name = 'authz'
+ if len(err.ops()) == 1 and err.ops()[0] == macaroonbakery.LOGIN_OP:
+ name = 'authn'
+ raise _DischargeRequiredError(name=name, m=m)
+
+
+class _DischargeRequiredError(Exception):
+ def __init__(self, name, m):
+ Exception.__init__(self, 'discharge required')
+ self._name = name
+ self._m = m
+
+ def m(self):
+ return self._m
+
+ def name(self):
+ return self._name
+
+
+class _Client(object):
+ max_retries = 3
+
+ def __init__(self, dischargers):
+ self._key = macaroonbakery.generate_key()
+ self._macaroons = {}
+ self._dischargers = dischargers
+
+ def do(self, ctx, svc, ops):
+ class _AuthInfo:
+ authInfo = None
+
+ def svc_do(ms):
+ _AuthInfo.authInfo = svc.do(ctx, ms, ops)
+
+ self._do_func(ctx, svc, svc_do)
+ return _AuthInfo.authInfo
+
+ def do_any(self, ctx, svc, ops):
+ return svc.do_any(ctx, self._request_macaroons(svc), ops)
+
+ def capability(self, ctx, svc, ops):
+ # capability returns a capability macaroon for the given operations.
+
+ class _M:
+ m = None
+
+ def svc_capability(ms):
+ _M.m = svc.capability(ctx, ms, ops)
+ return
+
+ self._do_func(ctx, svc, svc_capability)
+ return _M.m
+
+ def discharged_capability(self, ctx, svc, ops):
+ m = self.capability(ctx, svc, ops)
+ return self._discharge_all(ctx, m)
+
+ def _do_func(self, ctx, svc, f):
+ for i in range(0, self.max_retries):
+ try:
+ f(self._request_macaroons(svc))
+ return
+ except _DischargeRequiredError as exc:
+ ms = self._discharge_all(ctx, exc.m())
+ self.add_macaroon(svc, exc.name(), ms)
+ raise ValueError('discharge failed too many times')
+
+ def _clear_macaroons(self, svc):
+ if svc is None:
+ self._macaroons = {}
+ return
+ if svc.name() in self._macaroons:
+ del self._macaroons[svc.name()]
+
+ def add_macaroon(self, svc, name, m):
+ if svc.name() not in self._macaroons:
+ self._macaroons[svc.name()] = {}
+ self._macaroons[svc.name()][name] = m
+
+ def _request_macaroons(self, svc):
+ mmap = self._macaroons.get(svc.name(), [])
+ # Put all the macaroons in the slice ordered by key
+ # so that we have deterministic behaviour in the tests.
+ names = []
+ for name in mmap:
+ names.append(name)
+ names = sorted(names)
+ ms = [None] * len(names)
+ for i, name in enumerate(names):
+ ms[i] = mmap[name]
+ return ms
+
+ def _discharge_all(self, ctx, m):
+ def get_discharge(ctx, cav, pay_load):
+ d = self._dischargers.get(cav.location)
+ if d is None:
+ raise ValueError('third party discharger '
+ '{} not found'.format(cav.location))
+ return d.discharge(ctx, cav, pay_load)
+
+ return macaroonbakery.discharge_all(ctx, m, get_discharge)
+
+
+class _BasicAuthIdService(macaroonbakery.IdentityClient):
+ def identity_from_context(self, ctx):
+ user, pwd = _basic_auth_from_context(ctx)
+ if user != 'sherlock' or pwd != 'holmes':
+ return None, None
+ return macaroonbakery.SimpleIdentity(user), None
+
+ def declared_identity(self, ctx, declared):
+ raise macaroonbakery.IdentityError('no identity declarations in basic '
+ 'auth id service')
+
+
+_BASIC_AUTH_KEY = checkers.ContextKey('user-key')
+
+
+class _BasicAuth(object):
+ def __init__(self, user, password):
+ self.user = user
+ self.password = password
+
+
+def _context_with_basic_auth(ctx, user, password):
+ return ctx.with_value(_BASIC_AUTH_KEY, _BasicAuth(user, password))
+
+
+def _basic_auth_from_context(ctx):
+ auth = ctx.get(_BASIC_AUTH_KEY, _BasicAuth('', ''))
+ return auth.user, auth.password
+
+
+def _macaroon_conditions(caveats, allow_third):
+ conds = [''] * len(caveats)
+ for i, cav in enumerate(caveats):
+ if cav.location is not None and cav.location != '':
+ if not allow_third:
+ raise ValueError('found unexpected third party caveat:'
+ ' {}'.format(cav.location))
+ continue
+ conds[i] = cav.caveat_id.decode('utf-8')
+ return conds
+
+
+def _resolve_caveats(ns, caveats):
+ conds = [''] * len(caveats)
+ for i, cav in enumerate(caveats):
+ if cav.location is not None and cav.location != '':
+ raise ValueError('found unexpected third party caveat')
+ conds[i] = ns.resolve_caveat(cav).condition
+ return conds
+
+
+class _MacaroonStoreWithError(object):
+ def new_macaroon(self, caveats, ns, ops):
+ raise ValueError('some error')
+
+ def macaroon_ops(self, ms):
+ raise ValueError('some error')