summaryrefslogtreecommitdiff
path: root/macaroonbakery/tests
diff options
context:
space:
mode:
authorColin Watson <cjwatson@debian.org>2017-11-03 12:13:13 +0000
committerColin Watson <cjwatson@debian.org>2017-11-03 12:13:13 +0000
commit3d9eaeb5dacee168a93da090e2c0d46eedbe51a2 (patch)
tree779d797fb3cf6cc9552cb08c40662b5d3d8397fd /macaroonbakery/tests
parent79ff2842fa477ee0693ea167c0a74cd7cf080d27 (diff)
Import py-macaroon-bakery_0.0.4.orig.tar.gz
Diffstat (limited to 'macaroonbakery/tests')
-rw-r--r--macaroonbakery/tests/__init__.py2
-rw-r--r--macaroonbakery/tests/common.py120
-rw-r--r--macaroonbakery/tests/test_agent.py13
-rw-r--r--macaroonbakery/tests/test_authorizer.py132
-rw-r--r--macaroonbakery/tests/test_checker.py963
-rw-r--r--macaroonbakery/tests/test_checkers.py356
-rw-r--r--macaroonbakery/tests/test_codec.py164
-rw-r--r--macaroonbakery/tests/test_discharge.py445
-rw-r--r--macaroonbakery/tests/test_discharge_all.py170
-rw-r--r--macaroonbakery/tests/test_keyring.py111
-rw-r--r--macaroonbakery/tests/test_macaroon.py230
-rw-r--r--macaroonbakery/tests/test_namespace.py15
-rw-r--r--macaroonbakery/tests/test_oven.py125
-rw-r--r--macaroonbakery/tests/test_store.py21
14 files changed, 2731 insertions, 136 deletions
diff --git a/macaroonbakery/tests/__init__.py b/macaroonbakery/tests/__init__.py
index e69de29..46812ee 100644
--- a/macaroonbakery/tests/__init__.py
+++ b/macaroonbakery/tests/__init__.py
@@ -0,0 +1,2 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
diff --git a/macaroonbakery/tests/common.py b/macaroonbakery/tests/common.py
new file mode 100644
index 0000000..2619127
--- /dev/null
+++ b/macaroonbakery/tests/common.py
@@ -0,0 +1,120 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+from datetime import datetime, timedelta
+
+import pytz
+
+import macaroonbakery
+import macaroonbakery.checkers as checkers
+
+
+class _StoppedClock(object):
+ def __init__(self, t):
+ self.t = t
+
+ def utcnow(self):
+ return self.t
+
+
+epoch = pytz.utc.localize(
+ datetime(year=1900, month=11, day=17, hour=19, minute=00, second=13))
+ages = epoch + timedelta(days=1)
+
+test_context = checkers.context_with_clock(checkers.AuthContext(),
+ _StoppedClock(epoch))
+
+
+def test_checker():
+ c = checkers.Checker()
+ c.namespace().register('testns', '')
+ c.register('str', 'testns', str_check)
+ c.register('true', 'testns', true_check)
+ return c
+
+
+_str_key = checkers.ContextKey('str_check')
+
+
+def str_context(s):
+ return test_context.with_value(_str_key, s)
+
+
+def str_check(ctx, cond, args):
+ expect = ctx[_str_key]
+ if args != expect:
+ return '{} doesn\'t match {}'.format(cond, expect)
+ return None
+
+
+def true_check(ctx, cond, args):
+ # Always succeeds.
+ return None
+
+
+class OneIdentity(macaroonbakery.IdentityClient):
+ '''An IdentityClient implementation that always returns a single identity
+ from declared_identity, allowing allow(LOGIN_OP) to work even when there
+ are no declaration caveats (this is mostly to support the legacy tests
+ which do their own checking of declaration caveats).
+ '''
+
+ def identity_from_context(self, ctx):
+ return None, None
+
+ def declared_identity(self, ctx, declared):
+ return _NoOne()
+
+
+class _NoOne(object):
+ def id(self):
+ return 'noone'
+
+ def domain(self):
+ return ''
+
+
+class ThirdPartyStrcmpChecker(macaroonbakery.ThirdPartyCaveatChecker):
+ def __init__(self, str):
+ self.str = str
+
+ def check_third_party_caveat(self, ctx, cav_info):
+ condition = cav_info.condition
+ if isinstance(cav_info.condition, bytes):
+ condition = cav_info.condition.decode('utf-8')
+ if condition != self.str:
+ raise macaroonbakery.ThirdPartyCaveatCheckFailed(
+ '{} doesn\'t match {}'.format(condition, self.str))
+ return []
+
+
+class ThirdPartyCheckerWithCaveats(macaroonbakery.ThirdPartyCaveatChecker):
+ def __init__(self, cavs=None):
+ if cavs is None:
+ cavs = []
+ self.cavs = cavs
+
+ def check_third_party_caveat(self, ctx, cav_info):
+ return self.cavs
+
+
+class ThirdPartyCaveatCheckerEmpty(macaroonbakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(self, ctx, cav_info):
+ return []
+
+
+def new_bakery(location, locator=None):
+ # Returns a new Bakery instance using a new
+ # key pair, and registers the key with the given locator if provided.
+ #
+ # It uses test_checker to check first party caveats.
+ key = macaroonbakery.generate_key()
+ if locator is not None:
+ locator.add_info(location,
+ macaroonbakery.ThirdPartyInfo(
+ public_key=key.public_key,
+ version=macaroonbakery.LATEST_BAKERY_VERSION))
+ return macaroonbakery.Bakery(key=key,
+ checker=test_checker(),
+ location=location,
+ identity_client=OneIdentity(),
+ locator=locator)
diff --git a/macaroonbakery/tests/test_agent.py b/macaroonbakery/tests/test_agent.py
index 86133fe..49134f5 100644
--- a/macaroonbakery/tests/test_agent.py
+++ b/macaroonbakery/tests/test_agent.py
@@ -1,6 +1,5 @@
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
-
import base64
import json
import os
@@ -99,7 +98,7 @@ class TestAgents(TestCase):
agent.load_agent_file(self.no_username_agent_filename)
-agent_file = """
+agent_file = '''
{
"key": {
"public": "YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=",
@@ -113,10 +112,10 @@ agent_file = """
"username": "user-2"
}]
}
-"""
+'''
-bad_key_agent_file = """
+bad_key_agent_file = '''
{
"key": {
"public": "YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=",
@@ -130,10 +129,10 @@ bad_key_agent_file = """
"username": "user-2"
}]
}
-"""
+'''
-no_username_agent_file = """
+no_username_agent_file = '''
{
"key": {
"public": "YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=",
@@ -146,4 +145,4 @@ no_username_agent_file = """
"username": "user-2"
}]
}
-"""
+'''
diff --git a/macaroonbakery/tests/test_authorizer.py b/macaroonbakery/tests/test_authorizer.py
new file mode 100644
index 0000000..da01974
--- /dev/null
+++ b/macaroonbakery/tests/test_authorizer.py
@@ -0,0 +1,132 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+from unittest import TestCase
+
+import macaroonbakery
+import macaroonbakery.checkers as checkers
+
+
+class TestAuthorizer(TestCase):
+ def test_authorize_func(self):
+ def f(ctx, identity, op):
+ self.assertEqual(identity.id(), 'bob')
+ if op.entity == 'a':
+ return False, None
+ elif op.entity == 'b':
+ return True, None
+ elif op.entity == 'c':
+ return True, [checkers.Caveat(location='somewhere',
+ condition='c')]
+ elif op.entity == 'd':
+ return True, [checkers.Caveat(location='somewhere',
+ condition='d')]
+ else:
+ self.fail('unexpected entity: ' + op.Entity)
+
+ ops = [macaroonbakery.Op('a', 'x'), macaroonbakery.Op('b', 'x'),
+ macaroonbakery.Op('c', 'x'), macaroonbakery.Op('d', 'x')]
+ allowed, caveats = macaroonbakery.AuthorizerFunc(f).authorize(
+ checkers.AuthContext(),
+ macaroonbakery.SimpleIdentity('bob'),
+ ops
+ )
+ self.assertEqual(allowed, [False, True, True, True])
+ self.assertEqual(caveats, [
+ checkers.Caveat(location='somewhere', condition='c'),
+ checkers.Caveat(location='somewhere', condition='d')
+ ])
+
+ def test_acl_authorizer(self):
+ ctx = checkers.AuthContext()
+ tests = [
+ ('no ops, no problem',
+ macaroonbakery.ACLAuthorizer(allow_public=True,
+ get_acl=lambda x, y: []), None, [],
+ []),
+ ('identity that does not implement ACLIdentity; '
+ 'user should be denied except for everyone group',
+ macaroonbakery.ACLAuthorizer(allow_public=True,
+ get_acl=lambda ctx, op: [
+ macaroonbakery.EVERYONE]
+ if op.entity == 'a' else ['alice']),
+ SimplestIdentity('bob'),
+ [macaroonbakery.Op(entity='a', action='a'),
+ macaroonbakery.Op(entity='b', action='b')],
+ [True, False]),
+ ('identity that does not implement ACLIdentity with user == Id; '
+ 'user should be denied except for everyone group',
+ macaroonbakery.ACLAuthorizer(allow_public=True,
+ get_acl=lambda ctx, op: [
+ macaroonbakery.EVERYONE] if
+ op.entity == 'a' else ['bob']),
+ SimplestIdentity('bob'),
+ [macaroonbakery.Op(entity='a', action='a'),
+ macaroonbakery.Op(entity='b', action='b')],
+ [True, False]),
+ ('permission denied for everyone without AllowPublic',
+ macaroonbakery.ACLAuthorizer(allow_public=False,
+ get_acl=lambda x, y: [
+ macaroonbakery.EVERYONE]),
+ SimplestIdentity('bob'),
+ [macaroonbakery.Op(entity='a', action='a')],
+ [False]),
+ ('permission granted to anyone with no identity with AllowPublic',
+ macaroonbakery.ACLAuthorizer(allow_public=True,
+ get_acl=lambda x, y: [
+ macaroonbakery.EVERYONE]),
+ None,
+ [macaroonbakery.Op(entity='a', action='a')],
+ [True])
+ ]
+ for test in tests:
+ allowed, caveats = test[1].authorize(ctx, test[2], test[3])
+ self.assertEqual(len(caveats), 0)
+ self.assertEqual(allowed, test[4])
+
+ def test_context_wired_properly(self):
+ ctx = checkers.AuthContext({'a': 'aval'})
+
+ class Visited:
+ in_f = False
+ in_allow = False
+ in_get_acl = False
+
+ def f(ctx, identity, op):
+ self.assertEqual(ctx.get('a'), 'aval')
+ Visited.in_f = True
+ return False, None
+
+ macaroonbakery.AuthorizerFunc(f).authorize(
+ ctx, macaroonbakery.SimpleIdentity('bob'), ['op1']
+ )
+ self.assertTrue(Visited.in_f)
+
+ class TestIdentity(SimplestIdentity, macaroonbakery.ACLIdentity):
+ def allow(other, ctx, acls):
+ self.assertEqual(ctx.get('a'), 'aval')
+ Visited.in_allow = True
+ return False
+
+ def get_acl(ctx, acl):
+ self.assertEqual(ctx.get('a'), 'aval')
+ Visited.in_get_acl = True
+ return []
+
+ macaroonbakery.ACLAuthorizer(allow_public=False,
+ get_acl=get_acl).authorize(
+ ctx, TestIdentity('bob'), ['op1'])
+ self.assertTrue(Visited.in_get_acl)
+ self.assertTrue(Visited.in_allow)
+
+
+class SimplestIdentity(macaroonbakery.Identity):
+ # SimplestIdentity implements Identity for a string. Unlike
+ # SimpleIdentity, it does not implement ACLIdentity.
+ def __init__(self, user):
+ self._identity = user
+
+ def domain(self):
+ return ''
+
+ def id(self):
+ return self._identity
diff --git a/macaroonbakery/tests/test_checker.py b/macaroonbakery/tests/test_checker.py
new file mode 100644
index 0000000..06bf008
--- /dev/null
+++ b/macaroonbakery/tests/test_checker.py
@@ -0,0 +1,963 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+import base64
+from collections import namedtuple
+import json
+from unittest import TestCase
+from datetime import timedelta
+
+from pymacaroons.verifier import Verifier, FirstPartyCaveatVerifierDelegate
+import pymacaroons
+
+import macaroonbakery
+import macaroonbakery.checkers as checkers
+from macaroonbakery.tests.common import test_context, epoch, test_checker
+
+
+class TestChecker(TestCase):
+ def setUp(self):
+ self._discharges = []
+
+ def test_authorize_with_open_access_and_no_macaroons(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {macaroonbakery.Op(entity='something', action='read'):
+ {macaroonbakery.EVERYONE}})
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+ auth_info = client.do(test_context, ts,
+ [macaroonbakery.Op(entity='something',
+ action='read')])
+ self.assertEqual(len(self._discharges), 0)
+ self.assertIsNotNone(auth_info)
+ self.assertIsNone(auth_info.identity)
+ self.assertEqual(len(auth_info.macaroons), 0)
+
+ def test_authorization_denied(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = macaroonbakery.ClosedAuthorizer()
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ with self.assertRaises(macaroonbakery.PermissionDenied):
+ client.do(ctx, ts, [macaroonbakery.Op(entity='something',
+ action='read')])
+
+ def test_authorize_with_authentication_required(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {macaroonbakery.Op(entity='something', action='read'): {'bob'}})
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ auth_info = client.do(ctx, ts, [macaroonbakery.Op(entity='something',
+ action='read')])
+ self.assertEqual(self._discharges,
+ [_DischargeRecord(location='ids', user='bob')])
+ self.assertIsNotNone(auth_info)
+ self.assertEqual(auth_info.identity.id(), 'bob')
+ self.assertEqual(len(auth_info.macaroons), 1)
+
+ def test_authorize_multiple_ops(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='something', action='read'): {'bob'},
+ macaroonbakery.Op(entity='otherthing', action='read'): {'bob'}
+ }
+ )
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ client.do(ctx, ts, [
+ macaroonbakery.Op(entity='something', action='read'),
+ macaroonbakery.Op(entity='otherthing', action='read')
+ ])
+ self.assertEqual(self._discharges,
+ [_DischargeRecord(location='ids', user='bob')])
+
+ def test_capability(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {macaroonbakery.Op(entity='something', action='read'): {'bob'}})
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m = client.discharged_capability(
+ ctx, ts, [macaroonbakery.Op(entity='something', action='read')])
+ # Check that we can exercise the capability directly on the service
+ # with no discharging required.
+ auth_info = ts.do(test_context, [m],
+ [macaroonbakery.Op(entity='something',
+ action='read')])
+ self.assertIsNotNone(auth_info)
+ self.assertIsNone(auth_info.identity)
+ self.assertEqual(len(auth_info.macaroons), 1)
+ self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
+ m[0].identifier_bytes)
+
+ def test_capability_multiple_entities(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e3', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m = client.discharged_capability(ctx, ts, [
+ macaroonbakery.Op(entity='e1',
+ action='read'),
+ macaroonbakery.Op(entity='e2',
+ action='read'),
+ macaroonbakery.Op(entity='e3',
+ action='read')])
+ self.assertEqual(self._discharges,
+ [_DischargeRecord(location='ids', user='bob')])
+
+ # Check that we can exercise the capability directly on the service
+ # with no discharging required.
+ ts.do(test_context, [m], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')])
+
+ # Check that we can exercise the capability to act on a subset of
+ # the operations.
+ ts.do(test_context, [m], [
+ macaroonbakery.Op(entity='e2', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')]
+ )
+ ts.do(test_context, [m],
+ [macaroonbakery.Op(entity='e3', action='read')])
+
+ def test_multiple_capabilities(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire two capabilities as different users and check
+ # that we can combine them together to do both operations
+ # at once.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m1 = _Client(locator).discharged_capability(ctx, ts,
+ [macaroonbakery.Op(
+ entity='e1',
+ action='read')])
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m2 = _Client(locator).discharged_capability(ctx, ts,
+ [macaroonbakery.Op(
+ entity='e2',
+ action='read')])
+ self.assertEqual(self._discharges,
+ [
+ _DischargeRecord(location='ids', user='alice'),
+ _DischargeRecord(location='ids', user='bob'),
+ ])
+ auth_info = ts.do(test_context, [m1, m2], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+ self.assertIsNotNone(auth_info)
+ self.assertIsNone(auth_info.identity)
+ self.assertEqual(len(auth_info.macaroons), 2)
+ self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
+ m1[0].identifier_bytes)
+ self.assertEqual(auth_info.macaroons[1][0].identifier_bytes,
+ m2[0].identifier_bytes)
+
+ def test_combine_capabilities(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e3', action='read'): {'bob',
+ 'alice'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire two capabilities as different users and check
+ # that we can combine them together into a single capability
+ # capable of both operations.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m1 = _Client(locator).discharged_capability(
+ ctx, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')])
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m2 = _Client(locator).discharged_capability(
+ ctx, ts, [macaroonbakery.Op(entity='e2', action='read')])
+
+ m = ts.capability(test_context, [m1, m2], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')])
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read'),
+ macaroonbakery.Op(entity='e3', action='read')])
+
+ def test_partially_authorized_request(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire a capability for e1 but rely on authentication to
+ # authorize e2.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m = _Client(locator).discharged_capability(ctx, ts,
+ [macaroonbakery.Op(
+ entity='e1',
+ action='read')])
+ client = _Client(locator)
+ client.add_macaroon(ts, 'authz', m)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ client.discharged_capability(
+ ctx, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ def test_auth_with_third_party_caveats(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+
+ # We make an authorizer that requires a third party discharge
+ # when authorizing.
+ def authorize_with_tp_discharge(ctx, id, op):
+ if (id is not None and id.id() == 'bob' and
+ op == macaroonbakery.Op(entity='something',
+ action='read')):
+ return True, [checkers.Caveat(condition='question',
+ location='other third party')]
+ return False, None
+
+ auth = macaroonbakery.AuthorizerFunc(authorize_with_tp_discharge)
+ ts = _Service('myservice', auth, ids, locator)
+
+ class _LocalDischargeChecker(macaroonbakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(_, ctx, info):
+ if info.condition != 'question':
+ raise ValueError('third party condition not recognized')
+ self._discharges.append(_DischargeRecord(
+ location='other third party',
+ user=ctx.get(_DISCHARGE_USER_KEY)
+ ))
+ return []
+
+ locator['other third party'] = _Discharger(
+ key=macaroonbakery.generate_key(),
+ checker=_LocalDischargeChecker(),
+ locator=locator,
+ )
+ client = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ client.do(ctx, ts, [macaroonbakery.Op(entity='something',
+ action='read')])
+ self.assertEqual(self._discharges, [
+ _DischargeRecord(location='ids', user='bob'),
+ _DischargeRecord(location='other third party',
+ user='bob')
+ ])
+
+ def test_capability_combines_first_party_caveats(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'}
+ }
+ )
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire two capabilities as different users, add some first party
+ # caveats that we can combine them together into a single capability
+ # capable of both operations.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m1 = _Client(locator).capability(
+ ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ m1.macaroon.add_first_party_caveat('true 1')
+ m1.macaroon.add_first_party_caveat('true 2')
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m2 = _Client(locator).capability(
+ ctx, ts, [macaroonbakery.Op(entity='e2', action='read')])
+ m2.macaroon.add_first_party_caveat('true 3')
+ m2.macaroon.add_first_party_caveat('true 4')
+
+ client = _Client(locator)
+ client.add_macaroon(ts, 'authz1', [m1.macaroon])
+ client.add_macaroon(ts, 'authz2', [m2.macaroon])
+
+ m = client.capability(test_context, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+ self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [
+ 'true 1',
+ 'true 2',
+ 'true 3',
+ 'true 4',
+ ])
+
+ def test_first_party_caveat_squashing(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'alice'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+ tests = [
+ ('duplicates removed', [
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.Caveat(condition='true 2', namespace='testns'),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.Caveat(condition='true 3', namespace='testns'),
+ ], [
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.Caveat(condition='true 2', namespace='testns'),
+ checkers.Caveat(condition='true 3', namespace='testns'),
+ ]), ('earliest time before', [
+ checkers.time_before_caveat(epoch + timedelta(days=1)),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ checkers.time_before_caveat(
+ epoch + timedelta(days=0, hours=1)),
+ checkers.time_before_caveat(epoch + timedelta(
+ days=0, hours=0, minutes=5)),
+ ], [
+ checkers.time_before_caveat(epoch + timedelta(
+ days=0, hours=0, minutes=5)),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ ]), ('operations and declared caveats removed', [
+ checkers.deny_caveat(['foo']),
+ checkers.allow_caveat(['read', 'write']),
+ checkers.declared_caveat('username', 'bob'),
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ ], [
+ checkers.Caveat(condition='true 1', namespace='testns'),
+ ])
+ ]
+ for test in tests:
+ print(test[0])
+
+ # Make a first macaroon with all the required first party caveats.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m1 = _Client(locator).capability(
+ ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ m1.add_caveats(test[1], None, None)
+
+ # Make a second macaroon that's not used to check that it's
+ # caveats are not added.
+ m2 = _Client(locator).capability(
+ ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ m2.add_caveat(checkers.Caveat(
+ condition='true notused', namespace='testns'), None, None)
+ client = _Client(locator)
+ client.add_macaroon(ts, 'authz1', [m1.macaroon])
+ client.add_macaroon(ts, 'authz2', [m2.macaroon])
+
+ m3 = client.capability(
+ test_context, ts, [macaroonbakery.Op(entity='e1',
+ action='read')])
+ self.assertEqual(
+ _macaroon_conditions(m3.macaroon.caveats, False),
+ _resolve_caveats(m3.namespace, test[2]))
+
+ def test_login_only(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = macaroonbakery.ClosedAuthorizer()
+ ts = _Service('myservice', auth, ids, locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ self.assertIsNotNone(auth_info)
+ self.assertEqual(auth_info.identity.id(), 'bob')
+
+ def test_allow_any(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'alice'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire a capability for e1 but rely on authentication to
+ # authorize e2.
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ m = _Client(locator).discharged_capability(ctx, ts,
+ [macaroonbakery.Op(
+ entity='e1',
+ action='read')])
+
+ client = _Client(locator)
+ client.add_macaroon(ts, 'authz', m)
+
+ self._discharges = []
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ with self.assertRaises(_DischargeRequiredError):
+ client.do_any(
+ ctx, ts, [
+ macaroonbakery.LOGIN_OP,
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e1', action='read')
+ ]
+ )
+ self.assertEqual(len(self._discharges), 0)
+
+ # Log in as bob.
+ _, err = client.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+
+ # All the previous actions should now be allowed.
+ auth_info, allowed = client.do_any(
+ ctx, ts, [
+ macaroonbakery.LOGIN_OP,
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e1', action='read')
+ ]
+ )
+ self.assertEqual(auth_info.identity.id(), 'bob')
+ self.assertEqual(len(auth_info.macaroons), 2)
+ self.assertEqual(allowed, [True, True, True])
+
+ def test_auth_with_identity_from_context(self):
+ locator = _DischargerLocator()
+ ids = _BasicAuthIdService()
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'sherlock'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Check that we can perform the ops with basic auth in the
+ # context.
+ ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
+ auth_info = _Client(locator).do(
+ ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ self.assertEqual(auth_info.identity.id(), 'sherlock')
+ self.assertEqual(len(auth_info.macaroons), 0)
+
+ def test_auth_login_op_with_identity_from_context(self):
+ locator = _DischargerLocator()
+ ids = _BasicAuthIdService()
+ ts = _Service('myservice', macaroonbakery.ClosedAuthorizer(),
+ ids, locator)
+
+ # Check that we can use LoginOp
+ # when auth isn't granted through macaroons.
+ ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
+ auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'sherlock')
+ self.assertEqual(len(auth_info.macaroons), 0)
+
+ def test_operation_allow_caveat(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e1', action='write'): {'bob'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m = client.capability(
+ ctx, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e1', action='write'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ # Sanity check that we can do a write.
+ ts.do(test_context, [[m.macaroon]],
+ [macaroonbakery.Op(entity='e1', action='write')])
+
+ m.add_caveat(checkers.allow_caveat(['read']), None, None)
+
+ # A read operation should work.
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ # A write operation should fail
+ # even though the original macaroon allowed it.
+ with self.assertRaises(_DischargeRequiredError):
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='write')])
+
+ def test_operation_deny_caveat(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = _OpAuthorizer(
+ {
+ macaroonbakery.Op(entity='e1', action='read'): {'bob'},
+ macaroonbakery.Op(entity='e1', action='write'): {'bob'},
+ macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ })
+ ts = _Service('myservice', auth, ids, locator)
+ client = _Client(locator)
+
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ m = client.capability(
+ ctx, ts, [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e1', action='write'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ # Sanity check that we can do a write.
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='write')])
+
+ m.add_caveat(checkers.deny_caveat(['write']), None, None)
+
+ # A read operation should work.
+ ts.do(
+ test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='read'),
+ macaroonbakery.Op(entity='e2', action='read')])
+
+ # A write operation should fail
+ # even though the original macaroon allowed it.
+ with self.assertRaises(_DischargeRequiredError):
+ ts.do(test_context, [[m.macaroon]], [
+ macaroonbakery.Op(entity='e1', action='write')])
+
+ def test_duplicate_login_macaroons(self):
+ locator = _DischargerLocator()
+ ids = _IdService('ids', locator, self)
+ auth = macaroonbakery.ClosedAuthorizer()
+ ts = _Service('myservice', auth, ids, locator)
+
+ # Acquire a login macaroon for bob.
+ client1 = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
+ auth_info = client1.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'bob')
+
+ # Acquire a login macaroon for alice.
+ client2 = _Client(locator)
+ ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
+ auth_info = client2.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'alice')
+
+ # Combine the two login macaroons into one client.
+ client3 = _Client(locator)
+ client3.add_macaroon(ts, '1.bob',
+ client1._macaroons[ts.name()]['authn'])
+ client3.add_macaroon(ts, '2.alice',
+ client2._macaroons[ts.name()]['authn'])
+
+ # We should authenticate as bob (because macaroons are presented
+ # ordered by "cookie" name)
+ auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'bob')
+ self.assertEqual(len(auth_info.macaroons), 1)
+
+ # Try them the other way around and we should authenticate as alice.
+ client3 = _Client(locator)
+ client3.add_macaroon(ts, '1.alice',
+ client2._macaroons[ts.name()]['authn'])
+ client3.add_macaroon(ts, '2.bob',
+ client1._macaroons[ts.name()]['authn'])
+
+ auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP])
+ self.assertEqual(auth_info.identity.id(), 'alice')
+ self.assertEqual(len(auth_info.macaroons), 1)
+
+ def test_macaroon_ops_fatal_error(self):
+ # When we get a non-VerificationError error from the
+ # opstore, we don't do any more verification.
+ checker = macaroonbakery.Checker(
+ macaroon_opstore=_MacaroonStoreWithError())
+ m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2)
+ with self.assertRaises(ValueError):
+ checker.auth([m]).allow(test_context, [macaroonbakery.LOGIN_OP])
+
+
+class _DischargerLocator(object):
+ def __init__(self, dischargers=None):
+ if dischargers is None:
+ dischargers = {}
+ self._dischargers = dischargers
+
+ def third_party_info(self, loc):
+ d = self._dischargers.get(loc)
+ if d is None:
+ return None
+ return macaroonbakery.ThirdPartyInfo(
+ public_key=d._key.public_key,
+ version=macaroonbakery.LATEST_BAKERY_VERSION,
+ )
+
+ def __setitem__(self, key, item):
+ self._dischargers[key] = item
+
+ def __getitem__(self, key):
+ return self._dischargers[key]
+
+ def get(self, key):
+ return self._dischargers.get(key)
+
+
+class _IdService(macaroonbakery.IdentityClient,
+ macaroonbakery.ThirdPartyCaveatChecker):
+ def __init__(self, location, locator, test_class):
+ self._location = location
+ self._test = test_class
+ key = macaroonbakery.generate_key()
+ self._discharger = _Discharger(key=key, checker=self, locator=locator)
+ locator[location] = self._discharger
+
+ def check_third_party_caveat(self, ctx, info):
+ if info.condition != 'is-authenticated-user':
+ raise macaroonbakery.CaveatNotRecognizedError(
+ 'third party condition not '
+ 'recognized')
+
+ username = ctx.get(_DISCHARGE_USER_KEY, '')
+ if username == '':
+ return macaroonbakery.ThirdPartyCaveatCheckFailed(
+ 'no current user')
+ self._test._discharges.append(
+ _DischargeRecord(location=self._location, user=username))
+ return [checkers.declared_caveat('username', username)]
+
+ def identity_from_context(self, ctx):
+ return None, [checkers.Caveat(location=self._location,
+ condition='is-authenticated-user')]
+
+ def declared_identity(self, ctx, declared):
+ user = declared.get('username')
+ if user is None:
+ raise macaroonbakery.IdentityError('no username declared')
+ return macaroonbakery.SimpleIdentity(user)
+
+
+_DISCHARGE_USER_KEY = checkers.ContextKey('user-key')
+
+_DischargeRecord = namedtuple('_DISCHARGE_RECORD', ['location', 'user'])
+
+
+class _Discharger(object):
+ ''' utility class that has a discharge function with the same signature of
+ get_discharge for discharge_all.
+ '''
+
+ def __init__(self, key, locator, checker):
+ self._key = key
+ self._locator = locator
+ self._checker = checker
+
+ def discharge(self, ctx, cav, payload):
+ return macaroonbakery.discharge(ctx, key=self._key, id=cav.caveat_id,
+ caveat=payload,
+ checker=self._checker,
+ locator=self._locator)
+
+
+class _OpAuthorizer(macaroonbakery.Authorizer):
+ '''Implements bakery.Authorizer by looking the operation
+ up in the given map. If the username is in the associated list
+ or the list contains "everyone", authorization is granted.
+ '''
+
+ def __init__(self, auth=None):
+ if auth is None:
+ auth = {}
+ self._auth = auth
+
+ def authorize(self, ctx, id, ops):
+ return macaroonbakery.ACLAuthorizer(
+ allow_public=True,
+ get_acl=lambda ctx, op: self._auth.get(op, [])).authorize(
+ ctx, id, ops)
+
+
+class _MacaroonStore(object):
+ ''' Stores root keys in memory and puts all operations in the macaroon id.
+ '''
+
+ def __init__(self, key, locator):
+ self._root_key_store = macaroonbakery.MemoryKeyStore()
+ self._key = key
+ self._locator = locator
+
+ def new_macaroon(self, caveats, namespace, ops):
+ root_key, id = self._root_key_store.root_key()
+ m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops}
+ data = json.dumps(m_id)
+ m = macaroonbakery.Macaroon(
+ root_key=root_key, id=data, location='',
+ version=macaroonbakery.LATEST_BAKERY_VERSION,
+ namespace=namespace)
+ m.add_caveats(caveats, self._key, self._locator)
+ return m
+
+ def macaroon_ops(self, ms):
+ if len(ms) == 0:
+ raise ValueError('no macaroons provided')
+
+ m_id = json.loads(ms[0].identifier_bytes.decode('utf-8'))
+ root_key = self._root_key_store.get(
+ base64.urlsafe_b64decode(m_id['id'].encode('utf-8')))
+
+ v = Verifier()
+
+ class NoValidationOnFirstPartyCaveat(FirstPartyCaveatVerifierDelegate):
+ def verify_first_party_caveat(self, verifier, caveat, signature):
+ return True
+
+ v.first_party_caveat_verifier_delegate = \
+ NoValidationOnFirstPartyCaveat()
+ ok = v.verify(macaroon=ms[0], key=root_key,
+ discharge_macaroons=ms[1:])
+ if not ok:
+ raise macaroonbakery.VerificationError('invalid signature')
+ conditions = []
+ for m in ms:
+ cavs = m.first_party_caveats()
+ for cav in cavs:
+ conditions.append(cav.caveat_id_bytes.decode('utf-8'))
+ ops = []
+ for op in m_id['ops']:
+ ops.append(macaroonbakery.Op(entity=op[0], action=op[1]))
+ return ops, conditions
+
+
+class _Service(object):
+ '''Represents a service that requires authorization.
+
+ Clients can make requests to the service to perform operations
+ and may receive a macaroon to discharge if the authorization
+ process requires it.
+ '''
+
+ def __init__(self, name, auth, idm, locator):
+ self._name = name
+ self._store = _MacaroonStore(macaroonbakery.generate_key(), locator)
+ self._checker = macaroonbakery.Checker(
+ checker=test_checker(),
+ authorizer=auth,
+ identity_client=idm,
+ macaroon_opstore=self._store)
+
+ def name(self):
+ return self._name
+
+ def do(self, ctx, ms, ops):
+ try:
+ authInfo = self._checker.auth(ms).allow(ctx, ops)
+ except macaroonbakery.DischargeRequiredError as exc:
+ self._discharge_required_error(exc)
+ return authInfo
+
+ def do_any(self, ctx, ms, ops):
+ # makes a request to the service to perform any of the given
+ # operations. It reports which operations have succeeded.
+ try:
+ authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops)
+ return authInfo, allowed
+ except macaroonbakery.DischargeRequiredError as exc:
+ self._discharge_required_error(exc)
+
+ def capability(self, ctx, ms, ops):
+ try:
+ conds = self._checker.auth(ms).allow_capability(ctx, ops)
+ except macaroonbakery.DischargeRequiredError as exc:
+ self._discharge_required_error(exc)
+
+ m = self._store.new_macaroon(None, self._checker.namespace(), ops)
+ for cond in conds:
+ m.macaroon.add_first_party_caveat(cond)
+ return m
+
+ def _discharge_required_error(self, err):
+ m = self._store.new_macaroon(err.cavs(), self._checker.namespace(),
+ err.ops())
+ name = 'authz'
+ if len(err.ops()) == 1 and err.ops()[0] == macaroonbakery.LOGIN_OP:
+ name = 'authn'
+ raise _DischargeRequiredError(name=name, m=m)
+
+
+class _DischargeRequiredError(Exception):
+ def __init__(self, name, m):
+ Exception.__init__(self, 'discharge required')
+ self._name = name
+ self._m = m
+
+ def m(self):
+ return self._m
+
+ def name(self):
+ return self._name
+
+
+class _Client(object):
+ max_retries = 3
+
+ def __init__(self, dischargers):
+ self._key = macaroonbakery.generate_key()
+ self._macaroons = {}
+ self._dischargers = dischargers
+
+ def do(self, ctx, svc, ops):
+ class _AuthInfo:
+ authInfo = None
+
+ def svc_do(ms):
+ _AuthInfo.authInfo = svc.do(ctx, ms, ops)
+
+ self._do_func(ctx, svc, svc_do)
+ return _AuthInfo.authInfo
+
+ def do_any(self, ctx, svc, ops):
+ return svc.do_any(ctx, self._request_macaroons(svc), ops)
+
+ def capability(self, ctx, svc, ops):
+ # capability returns a capability macaroon for the given operations.
+
+ class _M:
+ m = None
+
+ def svc_capability(ms):
+ _M.m = svc.capability(ctx, ms, ops)
+ return
+
+ self._do_func(ctx, svc, svc_capability)
+ return _M.m
+
+ def discharged_capability(self, ctx, svc, ops):
+ m = self.capability(ctx, svc, ops)
+ return self._discharge_all(ctx, m)
+
+ def _do_func(self, ctx, svc, f):
+ for i in range(0, self.max_retries):
+ try:
+ f(self._request_macaroons(svc))
+ return
+ except _DischargeRequiredError as exc:
+ ms = self._discharge_all(ctx, exc.m())
+ self.add_macaroon(svc, exc.name(), ms)
+ raise ValueError('discharge failed too many times')
+
+ def _clear_macaroons(self, svc):
+ if svc is None:
+ self._macaroons = {}
+ return
+ if svc.name() in self._macaroons:
+ del self._macaroons[svc.name()]
+
+ def add_macaroon(self, svc, name, m):
+ if svc.name() not in self._macaroons:
+ self._macaroons[svc.name()] = {}
+ self._macaroons[svc.name()][name] = m
+
+ def _request_macaroons(self, svc):
+ mmap = self._macaroons.get(svc.name(), [])
+ # Put all the macaroons in the slice ordered by key
+ # so that we have deterministic behaviour in the tests.
+ names = []
+ for name in mmap:
+ names.append(name)
+ names = sorted(names)
+ ms = [None] * len(names)
+ for i, name in enumerate(names):
+ ms[i] = mmap[name]
+ return ms
+
+ def _discharge_all(self, ctx, m):
+ def get_discharge(ctx, cav, pay_load):
+ d = self._dischargers.get(cav.location)
+ if d is None:
+ raise ValueError('third party discharger '
+ '{} not found'.format(cav.location))
+ return d.discharge(ctx, cav, pay_load)
+
+ return macaroonbakery.discharge_all(ctx, m, get_discharge)
+
+
+class _BasicAuthIdService(macaroonbakery.IdentityClient):
+ def identity_from_context(self, ctx):
+ user, pwd = _basic_auth_from_context(ctx)
+ if user != 'sherlock' or pwd != 'holmes':
+ return None, None
+ return macaroonbakery.SimpleIdentity(user), None
+
+ def declared_identity(self, ctx, declared):
+ raise macaroonbakery.IdentityError('no identity declarations in basic '
+ 'auth id service')
+
+
+_BASIC_AUTH_KEY = checkers.ContextKey('user-key')
+
+
+class _BasicAuth(object):
+ def __init__(self, user, password):
+ self.user = user
+ self.password = password
+
+
+def _context_with_basic_auth(ctx, user, password):
+ return ctx.with_value(_BASIC_AUTH_KEY, _BasicAuth(user, password))
+
+
+def _basic_auth_from_context(ctx):
+ auth = ctx.get(_BASIC_AUTH_KEY, _BasicAuth('', ''))
+ return auth.user, auth.password
+
+
+def _macaroon_conditions(caveats, allow_third):
+ conds = [''] * len(caveats)
+ for i, cav in enumerate(caveats):
+ if cav.location is not None and cav.location != '':
+ if not allow_third:
+ raise ValueError('found unexpected third party caveat:'
+ ' {}'.format(cav.location))
+ continue
+ conds[i] = cav.caveat_id.decode('utf-8')
+ return conds
+
+
+def _resolve_caveats(ns, caveats):
+ conds = [''] * len(caveats)
+ for i, cav in enumerate(caveats):
+ if cav.location is not None and cav.location != '':
+ raise ValueError('found unexpected third party caveat')
+ conds[i] = ns.resolve_caveat(cav).condition
+ return conds
+
+
+class _MacaroonStoreWithError(object):
+ def new_macaroon(self, caveats, ns, ops):
+ raise ValueError('some error')
+
+ def macaroon_ops(self, ms):
+ raise ValueError('some error')
diff --git a/macaroonbakery/tests/test_checkers.py b/macaroonbakery/tests/test_checkers.py
new file mode 100644
index 0000000..f552fa4
--- /dev/null
+++ b/macaroonbakery/tests/test_checkers.py
@@ -0,0 +1,356 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+from datetime import datetime, timedelta
+from unittest import TestCase
+
+import six
+import pytz
+from pymacaroons import Macaroon, MACAROON_V2
+
+import macaroonbakery.checkers as checkers
+
+# A frozen time for the tests.
+NOW = datetime(
+ year=2006, month=1, day=2, hour=15, minute=4, second=5, microsecond=123)
+
+
+class TestClock():
+ def utcnow(self):
+ return pytz.UTC.localize(NOW)
+
+
+class TestCheckers(TestCase):
+ def test_checkers(self):
+
+ tests = [
+ ('nothing in context, no extra checkers', [
+ ('something',
+ 'caveat "something" not satisfied: caveat not recognized'),
+ ('', 'cannot parse caveat "": empty caveat'),
+ (' hello', 'cannot parse caveat " hello": caveat starts with'
+ ' space character'),
+ ], None),
+ ('one failed caveat', [
+ ('t:a aval', None),
+ ('t:b bval', None),
+ ('t:a wrong', 'caveat "t:a wrong" not satisfied: wrong arg'),
+ ], None),
+ ('time from clock', [
+ (checkers.time_before_caveat(
+ datetime.utcnow() +
+ timedelta(0, 1)).condition,
+ None),
+ (checkers.time_before_caveat(NOW).condition,
+ 'caveat "time-before 2006-01-02T15:04:05.000123Z" '
+ 'not satisfied: macaroon has expired'),
+ (checkers.time_before_caveat(NOW - timedelta(0, 1)).condition,
+ 'caveat "time-before 2006-01-02T15:04:04.000123Z" '
+ 'not satisfied: macaroon has expired'),
+ ('time-before bad-date',
+ 'caveat "time-before bad-date" not satisfied: '
+ 'cannot parse "bad-date" as RFC 3339'),
+ (checkers.time_before_caveat(NOW).condition + " ",
+ 'caveat "time-before 2006-01-02T15:04:05.000123Z " '
+ 'not satisfied: '
+ 'cannot parse "2006-01-02T15:04:05.000123Z " as RFC 3339'),
+ ], lambda x: checkers.context_with_clock(ctx, TestClock())),
+ ('real time', [
+ (checkers.time_before_caveat(datetime(
+ year=2010, month=1, day=1)).condition,
+ 'caveat "time-before 2010-01-01T00:00:00.000000Z" not '
+ 'satisfied: macaroon has expired'),
+ (checkers.time_before_caveat(datetime(
+ year=3000, month=1, day=1)).condition, None),
+ ], None),
+ ('declared, no entries', [
+ (checkers.declared_caveat('a', 'aval').condition,
+ 'caveat "declared a aval" not satisfied: got a=null, '
+ 'expected "aval"'),
+ (checkers.COND_DECLARED, 'caveat "declared" not satisfied: '
+ 'declared caveat has no value'),
+ ], None),
+ ('declared, some entries', [
+ (checkers.declared_caveat('a', 'aval').condition, None),
+ (checkers.declared_caveat('b', 'bval').condition, None),
+ (checkers.declared_caveat('spc', ' a b').condition, None),
+ (checkers.declared_caveat('a', 'bval').condition,
+ 'caveat "declared a bval" not satisfied: '
+ 'got a="aval", expected "bval"'),
+ (checkers.declared_caveat('a', ' aval').condition,
+ 'caveat "declared a aval" not satisfied: '
+ 'got a="aval", expected " aval"'),
+ (checkers.declared_caveat('spc', 'a b').condition,
+ 'caveat "declared spc a b" not satisfied: '
+ 'got spc=" a b", expected "a b"'),
+ (checkers.declared_caveat('', 'a b').condition,
+ 'caveat "error invalid caveat \'declared\' key """ '
+ 'not satisfied: bad caveat'),
+ (checkers.declared_caveat('a b', 'a b').condition,
+ 'caveat "error invalid caveat \'declared\' key "a b"" '
+ 'not satisfied: bad caveat'),
+ ], lambda x: checkers.context_with_declared(x, {
+ 'a': 'aval',
+ 'b': 'bval',
+ 'spc': ' a b'})),
+ ]
+ checker = checkers.Checker()
+ checker.namespace().register('testns', 't')
+ checker.register('a', 'testns', arg_checker(self, 't:a', 'aval'))
+ checker.register('b', 'testns', arg_checker(self, 't:b', 'bval'))
+ ctx = checkers.AuthContext()
+ for test in tests:
+ print(test[0])
+ if test[2] is not None:
+ ctx1 = test[2](ctx)
+ else:
+ ctx1 = ctx
+ for check in test[1]:
+ err = checker.check_first_party_caveat(ctx1, check[0])
+ if check[1] is not None:
+ self.assertEqual(err, check[1])
+ else:
+ self.assertIsNone(err)
+
+ def test_infer_declared(self):
+ tests = [
+ ('no macaroons', [], {}, None),
+ ('single macaroon with one declaration', [
+ [checkers.Caveat(condition='declared foo bar')]
+ ], {'foo': 'bar'}, None),
+ ('only one argument to declared', [
+ [checkers.Caveat(condition='declared foo')]
+ ], {}, None),
+ ('spaces in value', [
+ [checkers.Caveat(condition='declared foo bar bloggs')]
+ ], {'foo': 'bar bloggs'}, None),
+ ('attribute with declared prefix', [
+ [checkers.Caveat(condition='declaredccf foo')]
+ ], {}, None),
+ ('several macaroons with different declares', [
+ [
+ checkers.declared_caveat('a', 'aval'),
+ checkers.declared_caveat('b', 'bval')
+ ], [
+ checkers.declared_caveat('c', 'cval'),
+ checkers.declared_caveat('d', 'dval')
+ ]
+ ], {'a': 'aval', 'b': 'bval', 'c': 'cval', 'd': 'dval'}, None),
+ ('duplicate values', [
+ [
+ checkers.declared_caveat('a', 'aval'),
+ checkers.declared_caveat('a', 'aval'),
+ checkers.declared_caveat('b', 'bval')
+ ], [
+ checkers.declared_caveat('a', 'aval'),
+ checkers.declared_caveat('b', 'bval'),
+ checkers.declared_caveat('c', 'cval'),
+ checkers.declared_caveat('d', 'dval')
+ ]
+ ], {'a': 'aval', 'b': 'bval', 'c': 'cval', 'd': 'dval'}, None),
+ ('conflicting values', [
+ [
+ checkers.declared_caveat('a', 'aval'),
+ checkers.declared_caveat('a', 'conflict'),
+ checkers.declared_caveat('b', 'bval')
+ ], [
+ checkers.declared_caveat('a', 'conflict'),
+ checkers.declared_caveat('b', 'another conflict'),
+ checkers.declared_caveat('c', 'cval'),
+ checkers.declared_caveat('d', 'dval')
+ ]
+ ], {'c': 'cval', 'd': 'dval'}, None),
+ ('third party caveats ignored', [
+ [checkers.Caveat(condition='declared a no conflict',
+ location='location')],
+ [checkers.declared_caveat('a', 'aval')]
+ ], {'a': 'aval'}, None),
+ ('unparseable caveats ignored', [
+ [checkers.Caveat(condition=' bad')],
+ [checkers.declared_caveat('a', 'aval')]
+ ], {'a': 'aval'}, None),
+ ('infer with namespace', [
+ [
+ checkers.declared_caveat('a', 'aval'),
+ caveat_with_ns(checkers.declared_caveat('a', 'aval'),
+ 'testns'),
+ ]
+ ], {'a': 'aval'}, None),
+ ]
+ for test in tests:
+ uri_to_prefix = test[3]
+ if uri_to_prefix is None:
+ uri_to_prefix = {checkers.STD_NAMESPACE: ''}
+ ns = checkers.Namespace(uri_to_prefix)
+ print(test[0])
+ ms = []
+ for i, caveats in enumerate(test[1]):
+ m = Macaroon(key=None, identifier=six.int2byte(i), location='',
+ version=MACAROON_V2)
+ for cav in caveats:
+ cav = ns.resolve_caveat(cav)
+ if cav.location == '':
+ m.add_first_party_caveat(cav.condition)
+ else:
+ m.add_third_party_caveat(cav.location, None,
+ cav.condition)
+ ms.append(m)
+ self.assertEqual(checkers.infer_declared(ms), test[2])
+
+ def test_operations_checker(self):
+ tests = [
+ ('all allowed', checkers.allow_caveat(
+ ['op1', 'op2', 'op4', 'op3']),
+ ['op1', 'op3', 'op2'], None),
+ ('none denied', checkers.deny_caveat(['op1', 'op2']),
+ ['op3', 'op4'], None),
+ ('one not allowed', checkers.allow_caveat(['op1', 'op2']),
+ ['op1', 'op3'],
+ 'caveat "allow op1 op2" not satisfied: op3 not allowed'),
+ ('one not denied', checkers.deny_caveat(['op1', 'op2']),
+ ['op4', 'op5', 'op2'],
+ 'caveat "deny op1 op2" not satisfied: op2 not allowed'),
+ ('no operations, allow caveat', checkers.allow_caveat(['op1']),
+ [],
+ 'caveat "allow op1" not satisfied: op1 not allowed'),
+ ('no operations, deny caveat', checkers.deny_caveat(['op1']),
+ [], None),
+ ('no operations, empty allow caveat', checkers.Caveat(
+ condition=checkers.COND_ALLOW),
+ [], 'caveat "allow" not satisfied: no operations allowed'),
+ ]
+ checker = checkers.Checker()
+ for test in tests:
+ print(test[0])
+ ctx = checkers.context_with_operations(checkers.AuthContext(),
+ test[2])
+ err = checker.check_first_party_caveat(ctx, test[1].condition)
+ if test[3] is None:
+ self.assertIsNone(err)
+ continue
+ self.assertEqual(err, test[3])
+
+ def test_operation_error_caveat(self):
+ tests = [
+ ('empty allow', checkers.allow_caveat(None),
+ 'error no operations allowed'),
+ ('allow: invalid operation name',
+ checkers.allow_caveat(['op1', 'operation number 2']),
+ 'error invalid operation name "operation number 2"'),
+ ('deny: invalid operation name',
+ checkers.deny_caveat(['op1', 'operation number 2']),
+ 'error invalid operation name "operation number 2"')
+ ]
+ for test in tests:
+ print(test[0])
+ self.assertEqual(test[1].condition, test[2])
+
+ def test_register_none_func_raise_exception(self):
+ checker = checkers.Checker()
+ with self.assertRaises(checkers.RegisterError) as ctx:
+ checker.register('x', checkers.STD_NAMESPACE, None)
+ self.assertEqual(ctx.exception.args[0],
+ 'no check function registered for namespace std when '
+ 'registering condition x')
+
+ def test_register_no_registered_ns_exception(self):
+ checker = checkers.Checker()
+ with self.assertRaises(checkers.RegisterError) as ctx:
+ checker.register('x', 'testns', lambda x: None)
+ self.assertEqual(ctx.exception.args[0],
+ 'no prefix registered for namespace testns when '
+ 'registering condition x')
+
+ def test_register_empty_prefix_condition_with_colon(self):
+ checker = checkers.Checker()
+ checker.namespace().register('testns', '')
+ with self.assertRaises(checkers.RegisterError) as ctx:
+ checker.register('x:y', 'testns', lambda x: None)
+ self.assertEqual(ctx.exception.args[0],
+ 'caveat condition x:y in namespace testns contains a '
+ 'colon but its prefix is empty')
+
+ def test_register_twice_same_namespace(self):
+ checker = checkers.Checker()
+ checker.namespace().register('testns', '')
+ checker.register('x', 'testns', lambda x: None)
+ with self.assertRaises(checkers.RegisterError) as ctx:
+ checker.register('x', 'testns', lambda x: None)
+ self.assertEqual(ctx.exception.args[0],
+ 'checker for x (namespace testns) already registered'
+ ' in namespace testns')
+
+ def test_register_twice_different_namespace(self):
+ checker = checkers.Checker()
+ checker.namespace().register('testns', '')
+ checker.namespace().register('otherns', '')
+ checker.register('x', 'testns', lambda x: None)
+ with self.assertRaises(checkers.RegisterError) as ctx:
+ checker.register('x', 'otherns', lambda x: None)
+ self.assertEqual(ctx.exception.args[0],
+ 'checker for x (namespace otherns) already registered'
+ ' in namespace testns')
+
+ def test_checker_info(self):
+ checker = checkers.Checker(include_std_checkers=False)
+ checker.namespace().register('one', 't')
+ checker.namespace().register('two', 't')
+ checker.namespace().register('three', '')
+ checker.namespace().register('four', 's')
+
+ class Called(object):
+ val = ''
+
+ def register(name, ns):
+ def func(ctx, cond, arg):
+ Called.val = name + ' ' + ns
+ return None
+
+ checker.register(name, ns, func)
+
+ register('x', 'one')
+ register('y', 'one')
+ register('z', 'two')
+ register('a', 'two')
+ register('something', 'three')
+ register('other', 'three')
+ register('xxx', 'four')
+
+ expect = [
+ checkers.CheckerInfo(ns='four', name='xxx', prefix='s'),
+ checkers.CheckerInfo(ns='one', name='x', prefix='t'),
+ checkers.CheckerInfo(ns='one', name='y', prefix='t'),
+ checkers.CheckerInfo(ns='three', name='other', prefix=''),
+ checkers.CheckerInfo(ns='three', name='something', prefix=''),
+ checkers.CheckerInfo(ns='two', name='a', prefix='t'),
+ checkers.CheckerInfo(ns='two', name='z', prefix='t'),
+ ]
+ infos = checker.info()
+ self.assertEqual(len(infos), len(expect))
+ new_infos = []
+ for i, info in enumerate(infos):
+ Called.val = ''
+ info.check(None, '', '')
+ self.assertEqual(Called.val, expect[i].name + ' '
+ + expect[i].ns)
+ new_infos.append(checkers.CheckerInfo(ns=info.ns, name=info.name,
+ prefix=info.prefix))
+ self.assertEqual(new_infos, expect)
+
+
+def caveat_with_ns(cav, ns):
+ return checkers.Caveat(location=cav.location, condition=cav.condition,
+ namespace=ns)
+
+
+def arg_checker(test, expect_cond, check_arg):
+ ''' Returns a checker function that checks that the caveat condition is
+ check_arg.
+ '''
+
+ def func(ctx, cond, arg):
+ test.assertEqual(cond, expect_cond)
+ if arg != check_arg:
+ return 'wrong arg'
+ return None
+
+ return func
diff --git a/macaroonbakery/tests/test_codec.py b/macaroonbakery/tests/test_codec.py
index de1631c..6573266 100644
--- a/macaroonbakery/tests/test_codec.py
+++ b/macaroonbakery/tests/test_codec.py
@@ -1,95 +1,103 @@
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
+import base64
from unittest import TestCase
-import base64
+import nacl.public
import six
-import nacl.utils
-from nacl.public import PrivateKey
-from nacl.encoding import Base64Encoder
-
-from macaroonbakery import bakery, codec, macaroon, namespace, utils
+import macaroonbakery
+from macaroonbakery import utils
+from macaroonbakery import codec
+import macaroonbakery.checkers as checkers
class TestCodec(TestCase):
def setUp(self):
- self.fp_key = nacl.public.PrivateKey.generate()
- self.tp_key = nacl.public.PrivateKey.generate()
+ self.fp_key = macaroonbakery.generate_key()
+ self.tp_key = macaroonbakery.generate_key()
def test_v1_round_trip(self):
- tp_info = bakery.ThirdPartyInfo(bakery.BAKERY_V1,
- self.tp_key.public_key)
- cid = codec.encode_caveat('is-authenticated-user',
- b'a random string',
- tp_info,
- self.fp_key,
- None)
-
- res = codec.decode_caveat(self.tp_key, cid)
- self.assertEquals(res, macaroon.ThirdPartyCaveatInfo(
+ tp_info = macaroonbakery.ThirdPartyInfo(
+ version=macaroonbakery.BAKERY_V1,
+ public_key=self.tp_key.public_key)
+ cid = macaroonbakery.encode_caveat(
+ 'is-authenticated-user',
+ b'a random string',
+ tp_info,
+ self.fp_key,
+ None)
+ res = macaroonbakery.decode_caveat(self.tp_key, cid)
+ self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo(
first_party_public_key=self.fp_key.public_key,
root_key=b'a random string',
condition='is-authenticated-user',
caveat=cid,
third_party_key_pair=self.tp_key,
- version=bakery.BAKERY_V1,
- ns=macaroon.legacy_namespace()
+ version=macaroonbakery.BAKERY_V1,
+ namespace=macaroonbakery.legacy_namespace()
))
def test_v2_round_trip(self):
- tp_info = bakery.ThirdPartyInfo(bakery.BAKERY_V2,
- self.tp_key.public_key)
- cid = codec.encode_caveat('is-authenticated-user',
- b'a random string',
- tp_info,
- self.fp_key,
- None)
- res = codec.decode_caveat(self.tp_key, cid)
- self.assertEquals(res, macaroon.ThirdPartyCaveatInfo(
+ tp_info = macaroonbakery.ThirdPartyInfo(
+ version=macaroonbakery.BAKERY_V2,
+ public_key=self.tp_key.public_key)
+ cid = macaroonbakery.encode_caveat(
+ 'is-authenticated-user',
+ b'a random string',
+ tp_info,
+ self.fp_key,
+ None)
+ res = macaroonbakery.decode_caveat(self.tp_key, cid)
+ self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo(
first_party_public_key=self.fp_key.public_key,
root_key=b'a random string',
condition='is-authenticated-user',
caveat=cid,
third_party_key_pair=self.tp_key,
- version=bakery.BAKERY_V2,
- ns=macaroon.legacy_namespace()
+ version=macaroonbakery.BAKERY_V2,
+ namespace=macaroonbakery.legacy_namespace()
))
def test_v3_round_trip(self):
- tp_info = bakery.ThirdPartyInfo(bakery.BAKERY_V3,
- self.tp_key.public_key)
- ns = namespace.Namespace()
+ tp_info = macaroonbakery.ThirdPartyInfo(
+ version=macaroonbakery.BAKERY_V3,
+ public_key=self.tp_key.public_key)
+ ns = checkers.Namespace()
ns.register('testns', 'x')
- cid = codec.encode_caveat('is-authenticated-user',
- b'a random string',
- tp_info,
- self.fp_key,
- ns)
- res = codec.decode_caveat(self.tp_key, cid)
- self.assertEquals(res, macaroon.ThirdPartyCaveatInfo(
+ cid = macaroonbakery.encode_caveat(
+ 'is-authenticated-user',
+ b'a random string',
+ tp_info,
+ self.fp_key,
+ ns)
+ res = macaroonbakery.decode_caveat(self.tp_key, cid)
+ self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo(
first_party_public_key=self.fp_key.public_key,
root_key=b'a random string',
condition='is-authenticated-user',
caveat=cid,
third_party_key_pair=self.tp_key,
- version=bakery.BAKERY_V3,
- ns=ns
+ version=macaroonbakery.BAKERY_V3,
+ namespace=ns
))
def test_empty_caveat_id(self):
- with self.assertRaises(ValueError) as context:
- codec.decode_caveat(self.tp_key, b'')
+ with self.assertRaises(macaroonbakery.VerificationError) as context:
+ macaroonbakery.decode_caveat(self.tp_key, b'')
self.assertTrue('empty third party caveat' in str(context.exception))
def test_decode_caveat_v1_from_go(self):
- tp_key = PrivateKey(base64.b64decode(
- 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))
- fp_key = PrivateKey(base64.b64decode(
- 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))
- fp_key.encode(Base64Encoder)
+ tp_key = macaroonbakery.PrivateKey(
+ nacl.public.PrivateKey(base64.b64decode(
+ 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')))
+ fp_key = macaroonbakery.PrivateKey(
+ nacl.public.PrivateKey(base64.b64decode(
+ 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')))
+ root_key = base64.b64decode('vDxEmWZEkgiNEFlJ+8ruXe3qDSLf1H+o')
# This caveat has been generated from the go code
# to check the compatibilty
+
encrypted_cav = six.b(
'eyJUaGlyZFBhcnR5UHVibGljS2V5IjoiOFA3R1ZZc3BlWlN4c'
'3hFdmJsSVFFSTFqdTBTSWl0WlIrRFdhWE40cmxocz0iLCJGaX'
@@ -100,22 +108,25 @@ class TestCodec(TestCase):
'BORldUUExGdjVla1dWUjA4Uk1sbGJhc3c4VGdFbkhzM0laeVo'
'0V2lEOHhRUWdjU3ljOHY4eUt4dEhxejVEczJOYmh1ZDJhUFdt'
'UTVMcVlNWitmZ2FNaTAxdE9DIn0=')
- cav = codec.decode_caveat(tp_key, encrypted_cav)
- self.assertEquals(cav, macaroon.ThirdPartyCaveatInfo(
+ cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav)
+ self.assertEquals(cav, macaroonbakery.ThirdPartyCaveatInfo(
condition='caveat condition',
first_party_public_key=fp_key.public_key,
third_party_key_pair=tp_key,
- root_key=b'random',
+ root_key=root_key,
caveat=encrypted_cav,
- version=bakery.BAKERY_V1,
- ns=macaroon.legacy_namespace()
+ version=macaroonbakery.BAKERY_V1,
+ namespace=macaroonbakery.legacy_namespace()
))
def test_decode_caveat_v2_from_go(self):
- tp_key = PrivateKey(base64.b64decode(
- 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))
- fp_key = PrivateKey(base64.b64decode(
- 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))
+ tp_key = macaroonbakery.PrivateKey(nacl.public.PrivateKey(
+ base64.b64decode(
+ 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')))
+ fp_key = macaroonbakery.PrivateKey(
+ nacl.public.PrivateKey(base64.b64decode(
+ 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')))
+ root_key = base64.b64decode('wh0HSM65wWHOIxoGjgJJOFvQKn2jJFhC')
# This caveat has been generated from the go code
# to check the compatibilty
encrypted_cav = base64.urlsafe_b64decode(
@@ -123,22 +134,25 @@ class TestCodec(TestCase):
'AvD-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGIHq9xGcHS9IZ'
'Lh0cL6D9qpeKI0mXmCPfnwRQDuVYC8y5gVWd-oCGZaj5TGtk3byp2Vnw6ojmt'
'sULDhY59YA_J_Y0ATkERO5T9ajoRWBxU2OXBoX6bImXA')))
- cav = codec.decode_caveat(tp_key, encrypted_cav)
- self.assertEquals(cav, macaroon.ThirdPartyCaveatInfo(
+ cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav)
+ self.assertEqual(cav, macaroonbakery.ThirdPartyCaveatInfo(
condition='third party condition',
first_party_public_key=fp_key.public_key,
third_party_key_pair=tp_key,
- root_key=b'random',
+ root_key=root_key,
caveat=encrypted_cav,
- version=bakery.BAKERY_V2,
- ns=macaroon.legacy_namespace()
+ version=macaroonbakery.BAKERY_V2,
+ namespace=macaroonbakery.legacy_namespace()
))
def test_decode_caveat_v3_from_go(self):
- tp_key = PrivateKey(base64.b64decode(
- 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))
- fp_key = PrivateKey(base64.b64decode(
- 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))
+ tp_key = macaroonbakery.PrivateKey(
+ nacl.public.PrivateKey(base64.b64decode(
+ 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')))
+ fp_key = macaroonbakery.PrivateKey(nacl.public.PrivateKey(
+ base64.b64decode(
+ 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')))
+ root_key = base64.b64decode(b'oqOXI3/Mz/pKjCuFOt2eYxb7ndLq66GY')
# This caveat has been generated from the go code
# to check the compatibilty
encrypted_cav = base64.urlsafe_b64decode(
@@ -146,15 +160,15 @@ class TestCodec(TestCase):
'A_D-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGNLeFSkD2M-8A'
'EYvmgVH95GWu7T7caKxKhhOQFcEKgnXKJvYXxz1zin4cZc4Q6C7gVqA-J4_j3'
'1LX4VKxymqG62UGPo78wOv0_fKjr3OI6PPJOYOQgBMclemlRF2')))
- cav = codec.decode_caveat(tp_key, encrypted_cav)
- self.assertEquals(cav, macaroon.ThirdPartyCaveatInfo(
+ cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav)
+ self.assertEquals(cav, macaroonbakery.ThirdPartyCaveatInfo(
condition='third party condition',
first_party_public_key=fp_key.public_key,
third_party_key_pair=tp_key,
- root_key=b'random',
+ root_key=root_key,
caveat=encrypted_cav,
- version=bakery.BAKERY_V3,
- ns=macaroon.legacy_namespace()
+ version=macaroonbakery.BAKERY_V3,
+ namespace=macaroonbakery.legacy_namespace()
))
def test_encode_decode_varint(self):
@@ -169,10 +183,10 @@ class TestCodec(TestCase):
for test in tests:
data = bytearray()
expected = bytearray()
- codec._encode_uvarint(test[0], data)
+ macaroonbakery.encode_uvarint(test[0], data)
for v in test[1]:
expected.append(v)
self.assertEquals(data, expected)
- val = codec._decode_uvarint(bytes(data))
+ val = codec.decode_uvarint(bytes(data))
self.assertEquals(test[0], val[0])
self.assertEquals(len(test[1]), val[1])
diff --git a/macaroonbakery/tests/test_discharge.py b/macaroonbakery/tests/test_discharge.py
new file mode 100644
index 0000000..6e2df6a
--- /dev/null
+++ b/macaroonbakery/tests/test_discharge.py
@@ -0,0 +1,445 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+import unittest
+
+from pymacaroons import MACAROON_V1, Macaroon
+from pymacaroons.exceptions import (
+ MacaroonInvalidSignatureException, MacaroonUnmetCaveatException
+)
+
+import macaroonbakery
+import macaroonbakery.checkers as checkers
+from macaroonbakery.tests import common
+
+
+class TestDischarge(unittest.TestCase):
+ def test_single_service_first_party(self):
+ ''' Creates a single service with a macaroon with one first party
+ caveat.
+ It creates a request with this macaroon and checks that the service
+ can verify this macaroon as valid.
+ '''
+ oc = common.new_bakery('bakerytest')
+ primary = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, None,
+ [macaroonbakery.LOGIN_OP])
+ self.assertEqual(primary.macaroon.location, 'bakerytest')
+ primary.add_caveat(checkers.Caveat(condition='str something',
+ namespace='testns'),
+ oc.oven.key, oc.oven.locator)
+ oc.checker.auth([[primary.macaroon]]).allow(
+ common.str_context('something'), [macaroonbakery.LOGIN_OP])
+
+ def test_macaroon_paper_fig6(self):
+ ''' Implements an example flow as described in the macaroons paper:
+ http://theory.stanford.edu/~ataly/Papers/macaroons.pdf
+ There are three services, ts, fs, bs:
+ ts is a store service which has deligated authority to a forum
+ service fs.
+ The forum service wants to require its users to be logged into to an
+ authentication service bs.
+
+ The client obtains a macaroon from fs (minted by ts, with a third party
+ caveat addressed to bs).
+ The client obtains a discharge macaroon from bs to satisfy this caveat.
+ The target service verifies the original macaroon it delegated to fs
+ No direct contact between bs and ts is required
+ '''
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+ ts = common.new_bakery('ts-loc', locator)
+ fs = common.new_bakery('fs-loc', locator)
+
+ # ts creates a macaroon.
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages,
+ None, [macaroonbakery.LOGIN_OP])
+
+ # ts somehow sends the macaroon to fs which adds a third party caveat
+ # to be discharged by bs.
+ ts_macaroon.add_caveat(checkers.Caveat(location='bs-loc',
+ condition='user==bob'),
+ fs.oven.key, fs.oven.locator)
+
+ # client asks for a discharge macaroon for each third party caveat
+ def get_discharge(ctx, cav, payload):
+ self.assertEqual(cav.location, 'bs-loc')
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker(
+ 'user==bob'),
+ bs.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
+ get_discharge)
+
+ ts.checker.auth([d]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ def test_discharge_with_version1_macaroon(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+ ts = common.new_bakery('ts-loc', locator)
+
+ # ts creates a old-version macaroon.
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages,
+ None, [macaroonbakery.LOGIN_OP])
+ ts_macaroon.add_caveat(checkers.Caveat(condition='something',
+ location='bs-loc'),
+ ts.oven.key, ts.oven.locator)
+
+ # client asks for a discharge macaroon for each third party caveat
+
+ def get_discharge(ctx, cav, payload):
+ # Make sure that the caveat id really is old-style.
+ try:
+ cav.caveat_id_bytes.decode('utf-8')
+ except UnicodeDecodeError:
+ self.fail('caveat id is not utf-8')
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker(
+ 'something'),
+ bs.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
+ get_discharge)
+
+ ts.checker.auth([d]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ for m in d:
+ self.assertEqual(m.version, MACAROON_V1)
+
+ def test_version1_macaroon_id(self):
+ # In the version 1 bakery, macaroon ids were hex-encoded with a
+ # hyphenated UUID suffix.
+ root_key_store = macaroonbakery.MemoryKeyStore()
+ b = macaroonbakery.Bakery(root_key_store=root_key_store,
+ identity_client=common.OneIdentity())
+ key, id = root_key_store.root_key()
+ root_key_store.get(id)
+ m = Macaroon(key=key, version=MACAROON_V1, location='',
+ identifier=id + b'-deadl00f')
+ b.checker.auth([[m]]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ def test_macaroon_paper_fig6_fails_without_discharges(self):
+ ''' Runs a similar test as test_macaroon_paper_fig6 without the client
+ discharging the third party caveats.
+ '''
+ locator = macaroonbakery.ThirdPartyStore()
+ ts = common.new_bakery('ts-loc', locator)
+ fs = common.new_bakery('fs-loc', locator)
+ common.new_bakery('as-loc', locator)
+
+ # ts creates a macaroon.
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, None,
+ [macaroonbakery.LOGIN_OP])
+
+ # ts somehow sends the macaroon to fs which adds a third party
+ # caveat to be discharged by as.
+ ts_macaroon.add_caveat(checkers.Caveat(location='as-loc',
+ condition='user==bob'),
+ fs.oven.key, fs.oven.locator)
+
+ # client makes request to ts
+ try:
+ ts.checker.auth([[ts_macaroon.macaroon]]).allow(
+ common.test_context,
+ macaroonbakery.LOGIN_OP
+ )
+ self.fail('macaroon unmet should be raised')
+ except MacaroonUnmetCaveatException:
+ pass
+
+ def test_macaroon_paper_fig6_fails_with_binding_on_tampered_sig(self):
+ ''' Runs a similar test as test_macaroon_paper_fig6 with the discharge
+ macaroon binding being done on a tampered signature.
+ '''
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+ ts = common.new_bakery('ts-loc', locator)
+
+ # ts creates a macaroon.
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, None,
+ [macaroonbakery.LOGIN_OP])
+ # ts somehow sends the macaroon to fs which adds a third party caveat
+ # to be discharged by as.
+ ts_macaroon.add_caveat(checkers.Caveat(condition='user==bob',
+ location='bs-loc'),
+ ts.oven.key, ts.oven.locator)
+
+ # client asks for a discharge macaroon for each third party caveat
+ def get_discharge(ctx, cav, payload):
+ self.assertEqual(cav.location, 'bs-loc')
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker(
+ 'user==bob'),
+ bs.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
+ get_discharge)
+ # client has all the discharge macaroons. For each discharge macaroon
+ # bind it to our ts_macaroon and add it to our request.
+ tampered_macaroon = Macaroon()
+ for i, dm in enumerate(d[1:]):
+ d[i + 1] = tampered_macaroon.prepare_for_request(dm)
+
+ # client makes request to ts.
+ with self.assertRaises(MacaroonInvalidSignatureException) as exc:
+ ts.checker.auth([d]).allow(common.test_context,
+ macaroonbakery.LOGIN_OP)
+ self.assertEqual('Signatures do not match', exc.exception.args[0])
+
+ def test_need_declared(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ first_party = common.new_bakery('first', locator)
+ third_party = common.new_bakery('third', locator)
+
+ # firstParty mints a macaroon with a third-party caveat addressed
+ # to thirdParty with a need-declared caveat.
+ m = first_party.oven.macaroon(
+ macaroonbakery.LATEST_BAKERY_VERSION, common.ages, [
+ checkers.need_declared_caveat(
+ checkers.Caveat(location='third', condition='something'),
+ ['foo', 'bar']
+ )
+ ], [macaroonbakery.LOGIN_OP])
+
+ # The client asks for a discharge macaroon for each third party caveat.
+ def get_discharge(ctx, cav, payload):
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ third_party.oven.key,
+ common.ThirdPartyStrcmpChecker(
+ 'something'),
+ third_party.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+
+ # The required declared attributes should have been added
+ # to the discharge macaroons.
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'foo': '',
+ 'bar': '',
+ })
+
+ # Make sure the macaroons actually check out correctly
+ # when provided with the declared checker.
+ ctx = checkers.context_with_declared(common.test_context, declared)
+ first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+
+ # Try again when the third party does add a required declaration.
+
+ # The client asks for a discharge macaroon for each third party caveat.
+ def get_discharge(ctx, cav, payload):
+ checker = common.ThirdPartyCheckerWithCaveats([
+ checkers.declared_caveat('foo', 'a'),
+ checkers.declared_caveat('arble', 'b')
+ ])
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ third_party.oven.key,
+ checker,
+ third_party.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+
+ # One attribute should have been added, the other was already there.
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'foo': 'a',
+ 'bar': '',
+ 'arble': 'b',
+ })
+
+ ctx = checkers.context_with_declared(common.test_context, declared)
+ first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+
+ # Try again, but this time pretend a client is sneakily trying
+ # to add another 'declared' attribute to alter the declarations.
+
+ def get_discharge(ctx, cav, payload):
+ checker = common.ThirdPartyCheckerWithCaveats([
+ checkers.declared_caveat('foo', 'a'),
+ checkers.declared_caveat('arble', 'b'),
+ ])
+
+ # Sneaky client adds a first party caveat.
+ m = macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ third_party.oven.key, checker,
+ third_party.oven.locator)
+ m.add_caveat(checkers.declared_caveat('foo', 'c'), None, None)
+ return m
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'bar': '',
+ 'arble': 'b',
+ })
+
+ with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ first_party.checker.auth([d]).allow(common.test_context,
+ macaroonbakery.LOGIN_OP)
+ self.assertEqual('cannot authorize login macaroon: caveat '
+ '"declared foo a" not satisfied: got foo=null, '
+ 'expected "a"', exc.exception.args[0])
+
+ def test_discharge_two_need_declared(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ first_party = common.new_bakery('first', locator)
+ third_party = common.new_bakery('third', locator)
+
+ # first_party mints a macaroon with two third party caveats
+ # with overlapping attributes.
+ m = first_party.oven.macaroon(
+ macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, [
+ checkers.need_declared_caveat(
+ checkers.Caveat(location='third', condition='x'),
+ ['foo', 'bar']),
+ checkers.need_declared_caveat(
+ checkers.Caveat(location='third', condition='y'),
+ ['bar', 'baz']),
+ ], [macaroonbakery.LOGIN_OP])
+
+ # The client asks for a discharge macaroon for each third party caveat.
+ # Since no declarations are added by the discharger,
+
+ def get_discharge(ctx, cav, payload):
+ return macaroonbakery.discharge(
+ ctx, cav.caveat_id_bytes, payload, third_party.oven.key,
+ common.ThirdPartyCaveatCheckerEmpty(),
+ third_party.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'foo': '',
+ 'bar': '',
+ 'baz': '',
+ })
+ ctx = checkers.context_with_declared(common.test_context, declared)
+ first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+
+ # If they return conflicting values, the discharge fails.
+ # The client asks for a discharge macaroon for each third party caveat.
+ # Since no declarations are added by the discharger,
+ class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(self, ctx, cav_info):
+ if cav_info.condition == b'x':
+ return [checkers.declared_caveat('foo', 'fooval1')]
+ if cav_info.condition == b'y':
+ return [
+ checkers.declared_caveat('foo', 'fooval2'),
+ checkers.declared_caveat('baz', 'bazval')
+ ]
+ raise common.ThirdPartyCaveatCheckFailed('not matched')
+
+ def get_discharge(ctx, cav, payload):
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ third_party.oven.key,
+ ThirdPartyCaveatCheckerF(),
+ third_party.oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+
+ declared = checkers.infer_declared(d, first_party.checker.namespace())
+ self.assertEqual(declared, {
+ 'bar': '',
+ 'baz': 'bazval',
+ })
+ with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ first_party.checker.auth([d]).allow(common.test_context,
+ macaroonbakery.LOGIN_OP)
+ self.assertEqual('cannot authorize login macaroon: caveat "declared '
+ 'foo fooval1" not satisfied: got foo=null, expected '
+ '"fooval1"', exc.exception.args[0])
+
+ def test_discharge_macaroon_cannot_be_used_as_normal_macaroon(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ first_party = common.new_bakery('first', locator)
+ third_party = common.new_bakery('third', locator)
+
+ # First party mints a macaroon with a 3rd party caveat.
+ m = first_party.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages, [
+ checkers.Caveat(location='third',
+ condition='true')],
+ [macaroonbakery.LOGIN_OP])
+
+ # Acquire the discharge macaroon, but don't bind it to the original.
+ class M:
+ unbound = None
+
+ def get_discharge(ctx, cav, payload):
+ m = macaroonbakery.discharge(
+ ctx, cav.caveat_id_bytes, payload, third_party.oven.key,
+ common.ThirdPartyStrcmpChecker('true'),
+ third_party.oven.locator)
+ M.unbound = m.macaroon.copy()
+ return m
+
+ macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ self.assertIsNotNone(M.unbound)
+
+ # Make sure it cannot be used as a normal macaroon in the third party.
+ with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ third_party.checker.auth([[M.unbound]]).allow(
+ common.test_context, [macaroonbakery.LOGIN_OP])
+ self.assertEqual('no operations found in macaroon',
+ exc.exception.args[0])
+
+ def test_third_party_discharge_macaroon_ids_are_small(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ bakeries = {
+ 'ts-loc': common.new_bakery('ts-loc', locator),
+ 'as1-loc': common.new_bakery('as1-loc', locator),
+ 'as2-loc': common.new_bakery('as2-loc', locator),
+ }
+ ts = bakeries['ts-loc']
+
+ ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ common.ages,
+ None, [macaroonbakery.LOGIN_OP])
+ ts_macaroon.add_caveat(checkers.Caveat(condition='something',
+ location='as1-loc'),
+ ts.oven.key, ts.oven.locator)
+
+ class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ def __init__(self, loc):
+ self._loc = loc
+
+ def check_third_party_caveat(self, ctx, info):
+ if self._loc == 'as1-loc':
+ return [checkers.Caveat(condition='something',
+ location='as2-loc')]
+ if self._loc == 'as2-loc':
+ return []
+ raise common.ThirdPartyCaveatCheckFailed(
+ 'unknown location {}'.format(self._loc))
+
+ def get_discharge(ctx, cav, payload):
+ oven = bakeries[cav.location].oven
+ return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
+ oven.key,
+ ThirdPartyCaveatCheckerF(
+ cav.location),
+ oven.locator)
+
+ d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
+ get_discharge)
+ ts.checker.auth([d]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ for i, m in enumerate(d):
+ for j, cav in enumerate(m.caveats):
+ if (cav.verification_key_id is not None and
+ len(cav.caveat_id) > 3):
+ self.fail('caveat id on caveat {} of macaroon {} '
+ 'is too big ({})'.format(j, i, cav.id))
diff --git a/macaroonbakery/tests/test_discharge_all.py b/macaroonbakery/tests/test_discharge_all.py
new file mode 100644
index 0000000..8da8823
--- /dev/null
+++ b/macaroonbakery/tests/test_discharge_all.py
@@ -0,0 +1,170 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+import unittest
+
+from pymacaroons.verifier import Verifier
+
+import macaroonbakery
+import macaroonbakery.checkers as checkers
+from macaroonbakery.tests import common
+
+
+def always_ok(predicate):
+ return True
+
+
+class TestDischargeAll(unittest.TestCase):
+ def test_discharge_all_no_discharges(self):
+ root_key = b'root key'
+ m = macaroonbakery.Macaroon(
+ root_key=root_key, id=b'id0', location='loc0',
+ version=macaroonbakery.LATEST_BAKERY_VERSION,
+ namespace=common.test_checker().namespace())
+ ms = macaroonbakery.discharge_all(
+ common.test_context, m, no_discharge(self))
+ self.assertEqual(len(ms), 1)
+ self.assertEqual(ms[0], m.macaroon)
+ v = Verifier()
+ v.satisfy_general(always_ok)
+ v.verify(m.macaroon, root_key, None)
+
+ def test_discharge_all_many_discharges(self):
+ root_key = b'root key'
+ m0 = macaroonbakery.Macaroon(
+ root_key=root_key, id=b'id0', location='loc0',
+ version=macaroonbakery.LATEST_BAKERY_VERSION)
+
+ class State(object):
+ total_required = 40
+ id = 1
+
+ def add_caveats(m):
+ for i in range(0, 1):
+ if State.total_required == 0:
+ break
+ cid = 'id{}'.format(State.id)
+ m.macaroon.add_third_party_caveat(
+ location='somewhere',
+ key='root key {}'.format(cid).encode('utf-8'),
+ key_id=cid.encode('utf-8'))
+ State.id += 1
+ State.total_required -= 1
+
+ add_caveats(m0)
+
+ def get_discharge(_, cav, payload):
+ self.assertEqual(payload, None)
+ m = macaroonbakery.Macaroon(
+ root_key='root key {}'.format(
+ cav.caveat_id.decode('utf-8')).encode('utf-8'),
+ id=cav.caveat_id, location='',
+ version=macaroonbakery.LATEST_BAKERY_VERSION)
+
+ add_caveats(m)
+ return m
+
+ ms = macaroonbakery.discharge_all(
+ common.test_context, m0, get_discharge)
+
+ self.assertEqual(len(ms), 41)
+
+ v = Verifier()
+ v.satisfy_general(always_ok)
+ v.verify(ms[0], root_key, ms[1:])
+
+ def test_discharge_all_many_discharges_with_real_third_party_caveats(self):
+ # This is the same flow as TestDischargeAllManyDischarges except that
+ # we're using actual third party caveats as added by
+ # Macaroon.add_caveat and we use a larger number of caveats
+ # so that caveat ids will need to get larger.
+ locator = macaroonbakery.ThirdPartyStore()
+ bakeries = {}
+ total_discharges_required = 40
+
+ class M:
+ bakery_id = 0
+ still_required = total_discharges_required
+
+ def add_bakery():
+ M.bakery_id += 1
+ loc = 'loc{}'.format(M.bakery_id)
+ bakeries[loc] = common.new_bakery(loc, locator)
+ return loc
+
+ ts = common.new_bakery('ts-loc', locator)
+
+ def checker(_, ci):
+ caveats = []
+ if ci.condition != 'something':
+ self.fail('unexpected condition')
+ for i in range(0, 2):
+ if M.still_required <= 0:
+ break
+ caveats.append(checkers.Caveat(location=add_bakery(),
+ condition='something'))
+ M.still_required -= 1
+ return caveats
+
+ root_key = b'root key'
+ m0 = macaroonbakery.Macaroon(
+ root_key=root_key, id=b'id0', location='ts-loc',
+ version=macaroonbakery.LATEST_BAKERY_VERSION)
+
+ m0.add_caveat(checkers. Caveat(location=add_bakery(),
+ condition='something'),
+ ts.oven.key, locator)
+
+ # We've added a caveat (the first) so one less caveat is required.
+ M.still_required -= 1
+
+ class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(self, ctx, info):
+ return checker(ctx, info)
+
+ def get_discharge(ctx, cav, payload):
+ return macaroonbakery.discharge(
+ ctx, cav.caveat_id, payload,
+ bakeries[cav.location].oven.key,
+ ThirdPartyCaveatCheckerF(), locator)
+
+ ms = macaroonbakery.discharge_all(common.test_context, m0,
+ get_discharge)
+
+ self.assertEqual(len(ms), total_discharges_required + 1)
+
+ v = Verifier()
+ v.satisfy_general(always_ok)
+ v.verify(ms[0], root_key, ms[1:])
+
+ def test_discharge_all_local_discharge(self):
+ oc = common.new_bakery('ts', None)
+ client_key = macaroonbakery.generate_key()
+ m = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, common.ages,
+ [
+ macaroonbakery.local_third_party_caveat(
+ client_key.public_key,
+ macaroonbakery.LATEST_BAKERY_VERSION)
+ ], [macaroonbakery.LOGIN_OP])
+ ms = macaroonbakery.discharge_all(
+ common.test_context, m, no_discharge(self), client_key)
+ oc.checker.auth([ms]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+ def test_discharge_all_local_discharge_version1(self):
+ oc = common.new_bakery('ts', None)
+ client_key = macaroonbakery.generate_key()
+ m = oc.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages, [
+ macaroonbakery.local_third_party_caveat(
+ client_key.public_key, macaroonbakery.BAKERY_V1)
+ ], [macaroonbakery.LOGIN_OP])
+ ms = macaroonbakery.discharge_all(
+ common.test_context, m, no_discharge(self), client_key)
+ oc.checker.auth([ms]).allow(common.test_context,
+ [macaroonbakery.LOGIN_OP])
+
+
+def no_discharge(test):
+ def get_discharge(ctx, cav, payload):
+ test.fail("get_discharge called unexpectedly")
+
+ return get_discharge
diff --git a/macaroonbakery/tests/test_keyring.py b/macaroonbakery/tests/test_keyring.py
new file mode 100644
index 0000000..351b144
--- /dev/null
+++ b/macaroonbakery/tests/test_keyring.py
@@ -0,0 +1,111 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+import unittest
+
+from httmock import urlmatch, HTTMock
+
+import macaroonbakery
+from macaroonbakery import httpbakery
+
+
+class TestKeyRing(unittest.TestCase):
+
+ def test_cache_fetch(self):
+ key = macaroonbakery.generate_key()
+
+ @urlmatch(path='.*/discharge/info')
+ def discharge_info(url, request):
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Version': macaroonbakery.LATEST_BAKERY_VERSION,
+ 'PublicKey': key.public_key.encode().decode('utf-8')
+ }
+ }
+
+ expectInfo = macaroonbakery.ThirdPartyInfo(
+ public_key=key.public_key,
+ version=macaroonbakery.LATEST_BAKERY_VERSION
+ )
+ kr = httpbakery.ThirdPartyLocator(allow_insecure=True)
+ with HTTMock(discharge_info):
+ info = kr.third_party_info('http://0.1.2.3/')
+ self.assertEqual(info, expectInfo)
+
+ def test_cache_norefetch(self):
+ key = macaroonbakery.generate_key()
+
+ @urlmatch(path='.*/discharge/info')
+ def discharge_info(url, request):
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Version': macaroonbakery.LATEST_BAKERY_VERSION,
+ 'PublicKey': key.public_key.encode().decode('utf-8')
+ }
+ }
+
+ expectInfo = macaroonbakery.ThirdPartyInfo(
+ public_key=key.public_key,
+ version=macaroonbakery.LATEST_BAKERY_VERSION
+ )
+ kr = httpbakery.ThirdPartyLocator(allow_insecure=True)
+ with HTTMock(discharge_info):
+ info = kr.third_party_info('http://0.1.2.3/')
+ self.assertEqual(info, expectInfo)
+ info = kr.third_party_info('http://0.1.2.3/')
+ self.assertEqual(info, expectInfo)
+
+ def test_cache_fetch_no_version(self):
+ key = macaroonbakery.generate_key()
+
+ @urlmatch(path='.*/discharge/info')
+ def discharge_info(url, request):
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'PublicKey': key.public_key.encode().decode('utf-8')
+ }
+ }
+
+ expectInfo = macaroonbakery.ThirdPartyInfo(
+ public_key=key.public_key,
+ version=macaroonbakery.BAKERY_V1
+ )
+ kr = httpbakery.ThirdPartyLocator(allow_insecure=True)
+ with HTTMock(discharge_info):
+ info = kr.third_party_info('http://0.1.2.3/')
+ self.assertEqual(info, expectInfo)
+
+ def test_allow_insecure(self):
+ kr = httpbakery.ThirdPartyLocator()
+ with self.assertRaises(macaroonbakery.error.ThirdPartyInfoNotFound):
+ kr.third_party_info('http://0.1.2.3/')
+
+ def test_fallback(self):
+ key = macaroonbakery.generate_key()
+
+ @urlmatch(path='.*/discharge/info')
+ def discharge_info(url, request):
+ return {
+ 'status_code': 404,
+ }
+
+ @urlmatch(path='.*/publickey')
+ def public_key(url, request):
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'PublicKey': key.public_key.encode().decode('utf-8')
+ }
+ }
+
+ expectInfo = macaroonbakery.ThirdPartyInfo(
+ public_key=key.public_key,
+ version=macaroonbakery.BAKERY_V1
+ )
+ kr = httpbakery.ThirdPartyLocator(allow_insecure=True)
+ with HTTMock(discharge_info):
+ with HTTMock(public_key):
+ info = kr.third_party_info('http://0.1.2.3/')
+ self.assertEqual(info, expectInfo)
diff --git a/macaroonbakery/tests/test_macaroon.py b/macaroonbakery/tests/test_macaroon.py
index afc7d52..7e77e2b 100644
--- a/macaroonbakery/tests/test_macaroon.py
+++ b/macaroonbakery/tests/test_macaroon.py
@@ -1,64 +1,202 @@
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
-
+import json
from unittest import TestCase
import six
+import pymacaroons
+from pymacaroons import serializers
-import nacl.utils
-
-from macaroonbakery import bakery, macaroon, checkers, codec
+import macaroonbakery
+import macaroonbakery.checkers as checkers
+from macaroonbakery.tests import common
class TestMacaroon(TestCase):
def test_new_macaroon(self):
- m = macaroon.Macaroon(b'rootkey',
- b'some id',
- 'here',
- bakery.LATEST_BAKERY_VERSION)
+ m = macaroonbakery.Macaroon(
+ b'rootkey',
+ b'some id',
+ 'here',
+ macaroonbakery.LATEST_BAKERY_VERSION)
self.assertIsNotNone(m)
- self.assertEquals(m._macaroon.identifier, 'some id')
+ self.assertEquals(m._macaroon.identifier, b'some id')
self.assertEquals(m._macaroon.location, 'here')
- self.assertEquals(m.version, macaroon.macaroon_version(
- bakery.LATEST_BAKERY_VERSION))
+ self.assertEquals(m.version, macaroonbakery.LATEST_BAKERY_VERSION)
def test_add_first_party_caveat(self):
- m = macaroon.Macaroon('rootkey',
- 'some id',
- 'here',
- bakery.LATEST_BAKERY_VERSION)
- m = m.add_caveat(checkers.Caveat('test_condition'))
+ m = macaroonbakery.Macaroon('rootkey', 'some id', 'here',
+ macaroonbakery.LATEST_BAKERY_VERSION)
+ m.add_caveat(checkers.Caveat('test_condition'))
caveats = m.first_party_caveats()
self.assertEquals(len(caveats), 1)
- self.assertEquals(caveats[0].caveat_id, 'test_condition')
+ self.assertEquals(caveats[0].caveat_id, b'test_condition')
def test_add_third_party_caveat(self):
- m = macaroon.Macaroon('rootkey',
- 'some id',
- 'here',
- bakery.LATEST_BAKERY_VERSION)
- loc = macaroon.ThirdPartyLocator()
- fp_key = nacl.public.PrivateKey.generate()
- tp_key = nacl.public.PrivateKey.generate()
-
- loc.add_info('test_location',
- bakery.ThirdPartyInfo(
- bakery.BAKERY_V1,
- tp_key.public_key))
- m = m.add_caveat(checkers.Caveat(condition='test_condition',
- location='test_location'),
- fp_key, loc)
-
- tp_cav = m.third_party_caveats()
- self.assertEquals(len(tp_cav), 1)
- self.assertEquals(tp_cav[0].location, 'test_location')
- cav = codec.decode_caveat(tp_key, six.b(tp_cav[0].caveat_id))
- self.assertEquals(cav, macaroon.ThirdPartyCaveatInfo(
- condition='test_condition',
- first_party_public_key=fp_key.public_key,
- third_party_key_pair=tp_key,
- root_key='random',
- caveat=six.b(tp_cav[0].caveat_id),
- version=bakery.BAKERY_V1,
- ns=macaroon.legacy_namespace()
- ))
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+
+ lbv = six.int2byte(macaroonbakery.LATEST_BAKERY_VERSION)
+ tests = [
+ ('no existing id', b'', [], lbv + six.int2byte(0)),
+ ('several existing ids', b'', [
+ lbv + six.int2byte(0),
+ lbv + six.int2byte(1),
+ lbv + six.int2byte(2)
+ ], lbv + six.int2byte(3)),
+ ('with base id', lbv + six.int2byte(0), [lbv + six.int2byte(0)],
+ lbv + six.int2byte(0) + six.int2byte(0)),
+ ('with base id and existing id', lbv + six.int2byte(0), [
+ lbv + six.int2byte(0) + six.int2byte(0)
+ ], lbv + six.int2byte(0) + six.int2byte(1))
+ ]
+
+ for test in tests:
+ print('test ', test[0])
+ m = macaroonbakery.Macaroon(
+ root_key=b'root key', id=b'id',
+ location='location',
+ version=macaroonbakery.LATEST_BAKERY_VERSION)
+ for id in test[2]:
+ m.macaroon.add_third_party_caveat(key=None, key_id=id,
+ location='')
+ m._caveat_id_prefix = test[1]
+ m.add_caveat(checkers.Caveat(location='bs-loc',
+ condition='something'),
+ bs.oven.key, locator)
+ self.assertEqual(m.macaroon.caveats[len(test[2])].caveat_id,
+ test[3])
+
+ def test_marshal_json_latest_version(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+ ns = checkers.Namespace({
+ 'testns': 'x',
+ 'otherns': 'y',
+ })
+ m = macaroonbakery.Macaroon(
+ root_key=b'root key', id=b'id',
+ location='location',
+ version=macaroonbakery.LATEST_BAKERY_VERSION,
+ namespace=ns)
+ m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'),
+ bs.oven.key, locator)
+ data = m.serialize_json()
+ m1 = macaroonbakery.Macaroon.deserialize_json(data)
+ # Just check the signature and version - we're not interested in fully
+ # checking the macaroon marshaling here.
+ self.assertEqual(m1.macaroon.signature, m.macaroon.signature)
+ self.assertEqual(m1.macaroon.version, m.macaroon.version)
+ self.assertEqual(len(m1.macaroon.caveats), 1)
+ self.assertEqual(m1.namespace, m.namespace)
+ self.assertEqual(m1._caveat_data, m._caveat_data)
+
+ # test with the encoder, decoder
+ data = json.dumps(m, cls=macaroonbakery.MacaroonJSONEncoder)
+ m1 = json.loads(data, cls=macaroonbakery.MacaroonJSONDecoder)
+ self.assertEqual(m1.macaroon.signature, m.macaroon.signature)
+ self.assertEqual(m1.macaroon.version, m.macaroon.version)
+ self.assertEqual(len(m1.macaroon.caveats), 1)
+ self.assertEqual(m1.namespace, m.namespace)
+ self.assertEqual(m1._caveat_data, m._caveat_data)
+
+ def test_json_version1(self):
+ self._test_json_with_version(macaroonbakery.BAKERY_V1)
+
+ def test_json_version2(self):
+ self._test_json_with_version(macaroonbakery.BAKERY_V2)
+
+ def _test_json_with_version(self, version):
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery('bs-loc', locator)
+
+ ns = checkers.Namespace({
+ 'testns': 'x',
+ })
+
+ m = macaroonbakery.Macaroon(
+ root_key=b'root key', id=b'id',
+ location='location', version=version,
+ namespace=ns)
+ m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'),
+ bs.oven.key, locator)
+
+ # Sanity check that no external caveat data has been added.
+ self.assertEqual(len(m._caveat_data), 0)
+
+ data = json.dumps(m, cls=macaroonbakery.MacaroonJSONEncoder)
+ m1 = json.loads(data, cls=macaroonbakery.MacaroonJSONDecoder)
+
+ # Just check the signature and version - we're not interested in fully
+ # checking the macaroon marshaling here.
+ self.assertEqual(m1.macaroon.signature, m.macaroon.signature)
+ self.assertEqual(m1.macaroon.version,
+ macaroonbakery.macaroon_version(version))
+ self.assertEqual(len(m1.macaroon.caveats), 1)
+
+ # Namespace information has been thrown away.
+ self.assertEqual(m1.namespace, macaroonbakery.legacy_namespace())
+
+ self.assertEqual(len(m1._caveat_data), 0)
+
+ def test_json_unknown_version(self):
+ m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2)
+ with self.assertRaises(ValueError) as exc:
+ json.loads(json.dumps({
+ 'm': m.serialize(serializer=serializers.JsonSerializer()),
+ 'v': macaroonbakery.LATEST_BAKERY_VERSION + 1
+ }), cls=macaroonbakery.MacaroonJSONDecoder)
+ self.assertEqual('unknow bakery version 4', exc.exception.args[0])
+
+ def test_json_inconsistent_version(self):
+ m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V1)
+ with self.assertRaises(ValueError) as exc:
+ json.loads(json.dumps({
+ 'm': json.loads(m.serialize(
+ serializer=serializers.JsonSerializer())),
+ 'v': macaroonbakery.LATEST_BAKERY_VERSION
+ }), cls=macaroonbakery.MacaroonJSONDecoder)
+ self.assertEqual('underlying macaroon has inconsistent version; '
+ 'got 1 want 2', exc.exception.args[0])
+
+ def test_clone(self):
+ locator = macaroonbakery.ThirdPartyStore()
+ bs = common.new_bakery("bs-loc", locator)
+ ns = checkers.Namespace({
+ "testns": "x",
+ })
+ m = macaroonbakery.Macaroon(
+ root_key=b'root key', id=b'id',
+ location='location',
+ version=macaroonbakery.LATEST_BAKERY_VERSION,
+ namespace=ns)
+ m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'),
+ bs.oven.key, locator)
+ m1 = m.copy()
+ self.assertEqual(len(m.macaroon.caveats), 1)
+ self.assertEqual(len(m1.macaroon.caveats), 1)
+ self.assertEqual(m._caveat_data, m1._caveat_data)
+ m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'),
+ bs.oven.key, locator)
+ self.assertEqual(len(m.macaroon.caveats), 2)
+ self.assertEqual(len(m1.macaroon.caveats), 1)
+ self.assertNotEqual(m._caveat_data, m1._caveat_data)
+
+ def test_json_deserialize_from_go(self):
+ ns = checkers.Namespace()
+ ns.register("someuri", "x")
+ m = macaroonbakery.Macaroon(
+ root_key=b'rootkey', id=b'some id', location='here',
+ version=macaroonbakery.LATEST_BAKERY_VERSION, namespace=ns)
+ m.add_caveat(checkers.Caveat(condition='something',
+ namespace='someuri'))
+ data = '{"m":{"c":[{"i":"x:something"}],"l":"here","i":"some id",' \
+ '"s64":"c8edRIupArSrY-WZfa62pgZFD8VjDgqho9U2PlADe-E"},"v":3,' \
+ '"ns":"someuri:x"}'
+ m_go = macaroonbakery.Macaroon.deserialize_json(data)
+
+ self.assertEqual(m.macaroon.signature_bytes,
+ m_go.macaroon.signature_bytes)
+ self.assertEqual(m.macaroon.version, m_go.macaroon.version)
+ self.assertEqual(len(m_go.macaroon.caveats), 1)
+ self.assertEqual(m.namespace, m_go.namespace)
diff --git a/macaroonbakery/tests/test_namespace.py b/macaroonbakery/tests/test_namespace.py
index 24eda29..2f04bb3 100644
--- a/macaroonbakery/tests/test_namespace.py
+++ b/macaroonbakery/tests/test_namespace.py
@@ -1,9 +1,8 @@
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
-
from unittest import TestCase
-from macaroonbakery import namespace
+import macaroonbakery.checkers as checkers
class TestNamespace(TestCase):
@@ -23,17 +22,17 @@ class TestNamespace(TestCase):
}, b'a:one a1:two')
]
for test in tests:
- ns = namespace.Namespace(test[1])
- data = ns.serialize()
+ ns = checkers.Namespace(test[1])
+ data = ns.serialize_text()
self.assertEquals(data, test[2])
self.assertEquals(str(ns), test[2].decode('utf-8'))
# Check that it can be deserialize to the same thing:
- ns1 = namespace.deserialize_namespace(data)
+ ns1 = checkers.deserialize_namespace(data)
self.assertEquals(ns1, ns)
def test_register(self):
- ns = namespace.Namespace(None)
+ ns = checkers.Namespace(None)
ns.register('testns', 't')
prefix = ns.resolve('testns')
self.assertEquals(prefix, 't')
@@ -48,11 +47,11 @@ class TestNamespace(TestCase):
self.assertEquals(prefix, 'o')
def test_register_bad_uri(self):
- ns = namespace.Namespace(None)
+ ns = checkers.Namespace(None)
with self.assertRaises(KeyError):
ns.register('', 'x')
def test_register_bad_prefix(self):
- ns = namespace.Namespace(None)
+ ns = checkers.Namespace(None)
with self.assertRaises(ValueError):
ns.register('std', 'x:1')
diff --git a/macaroonbakery/tests/test_oven.py b/macaroonbakery/tests/test_oven.py
new file mode 100644
index 0000000..2976e94
--- /dev/null
+++ b/macaroonbakery/tests/test_oven.py
@@ -0,0 +1,125 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+from unittest import TestCase
+
+import copy
+from datetime import datetime, timedelta
+
+import macaroonbakery
+
+EPOCH = datetime(1900, 11, 17, 19, 00, 13, 0, None)
+AGES = EPOCH + timedelta(days=10)
+
+
+class TestOven(TestCase):
+ def test_canonical_ops(self):
+ canonical_ops_tests = (
+ ('empty array', [], []),
+ ('one element', [macaroonbakery.Op('a', 'a')],
+ [macaroonbakery.Op('a', 'a')]),
+ ('all in order',
+ [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'),
+ macaroonbakery.Op('c', 'c')],
+ [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'),
+ macaroonbakery.Op('c', 'c')]),
+ ('out of order',
+ [macaroonbakery.Op('c', 'c'), macaroonbakery.Op('a', 'b'),
+ macaroonbakery.Op('a', 'a')],
+ [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'),
+ macaroonbakery.Op('c', 'c')]),
+ ('with duplicates',
+ [macaroonbakery.Op('c', 'c'), macaroonbakery.Op('a', 'b'),
+ macaroonbakery.Op('a', 'a'), macaroonbakery.Op('c', 'a'),
+ macaroonbakery.Op('c', 'b'), macaroonbakery.Op('c', 'c'),
+ macaroonbakery.Op('a', 'a')],
+ [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'),
+ macaroonbakery.Op('c', 'a'), macaroonbakery.Op('c', 'b'),
+ macaroonbakery.Op('c', 'c')]),
+ ('make sure we\'ve got the fields right',
+ [macaroonbakery.Op(entity='read', action='two'),
+ macaroonbakery.Op(entity='read', action='one'),
+ macaroonbakery.Op(entity='write', action='one')],
+ [macaroonbakery.Op(entity='read', action='one'),
+ macaroonbakery.Op(entity='read', action='two'),
+ macaroonbakery.Op(entity='write', action='one')])
+ )
+ for about, ops, expected in canonical_ops_tests:
+ new_ops = copy.copy(ops)
+ canonical_ops = macaroonbakery.canonical_ops(new_ops)
+ self.assertEquals(canonical_ops, expected)
+ # Verify that the original array isn't changed.
+ self.assertEquals(new_ops, ops)
+
+ def test_multiple_ops(self):
+ test_oven = macaroonbakery.Oven(
+ ops_store=macaroonbakery.MemoryOpsStore())
+ ops = [macaroonbakery.Op('one', 'read'),
+ macaroonbakery.Op('one', 'write'),
+ macaroonbakery.Op('two', 'read')]
+ m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES,
+ None, ops)
+ got_ops, conds = test_oven.macaroon_ops([m.macaroon])
+ self.assertEquals(len(conds), 1) # time-before caveat.
+ self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops)
+
+ def test_multiple_ops_in_id(self):
+ test_oven = macaroonbakery.Oven()
+ ops = [macaroonbakery.Op('one', 'read'),
+ macaroonbakery.Op('one', 'write'),
+ macaroonbakery.Op('two', 'read')]
+ m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES,
+ None, ops)
+ got_ops, conds = test_oven.macaroon_ops([m.macaroon])
+ self.assertEquals(len(conds), 1) # time-before caveat.
+ self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops)
+
+ def test_multiple_ops_in_id_with_version1(self):
+ test_oven = macaroonbakery.Oven()
+ ops = [macaroonbakery.Op('one', 'read'),
+ macaroonbakery.Op('one', 'write'),
+ macaroonbakery.Op('two', 'read')]
+ m = test_oven.macaroon(macaroonbakery.BAKERY_V1, AGES, None, ops)
+ got_ops, conds = test_oven.macaroon_ops([m.macaroon])
+ self.assertEquals(len(conds), 1) # time-before caveat.
+ self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops)
+
+ def test_huge_number_of_ops_gives_small_macaroon(self):
+ test_oven = macaroonbakery.Oven(
+ ops_store=macaroonbakery.MemoryOpsStore())
+ ops = []
+ for i in range(30000):
+ ops.append(macaroonbakery.Op(entity='entity{}'.format(i),
+ action='action{}'.format(i)))
+
+ m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES,
+ None, ops)
+ got_ops, conds = test_oven.macaroon_ops([m.macaroon])
+ self.assertEquals(len(conds), 1) # time-before caveat.
+ self.assertEquals(macaroonbakery.canonical_ops(got_ops),
+ macaroonbakery.canonical_ops(ops))
+
+ data = m.serialize_json()
+ self.assertLess(len(data), 300)
+
+ def test_ops_stored_only_once(self):
+ st = macaroonbakery.MemoryOpsStore()
+ test_oven = macaroonbakery.Oven(ops_store=st)
+
+ ops = [macaroonbakery.Op('one', 'read'),
+ macaroonbakery.Op('one', 'write'),
+ macaroonbakery.Op('two', 'read')]
+
+ m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES,
+ None, ops)
+ got_ops, conds = test_oven.macaroon_ops([m.macaroon])
+ self.assertEquals(macaroonbakery.canonical_ops(got_ops),
+ macaroonbakery.canonical_ops(ops))
+
+ # Make another macaroon containing the same ops in a different order.
+ ops = [macaroonbakery.Op('one', 'write'),
+ macaroonbakery.Op('one', 'read'),
+ macaroonbakery.Op('one', 'read'),
+ macaroonbakery.Op('two', 'read')]
+ test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, None,
+ ops)
+ self.assertEquals(len(st._store), 1)
diff --git a/macaroonbakery/tests/test_store.py b/macaroonbakery/tests/test_store.py
new file mode 100644
index 0000000..7bcc4c2
--- /dev/null
+++ b/macaroonbakery/tests/test_store.py
@@ -0,0 +1,21 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+from unittest import TestCase
+
+import macaroonbakery
+
+
+class TestOven(TestCase):
+ def test_mem_store(self):
+ st = macaroonbakery.MemoryKeyStore()
+
+ key, id = st.root_key()
+ self.assertEqual(len(key), 24)
+ self.assertEqual(id.decode('utf-8'), '0')
+
+ key1, id1 = st.root_key()
+ self.assertEqual(key1, key)
+ self.assertEqual(id1, id)
+
+ key2 = st.get(id)
+ self.assertEqual(key2, key)