summaryrefslogtreecommitdiff
path: root/macaroonbakery/tests
diff options
context:
space:
mode:
Diffstat (limited to 'macaroonbakery/tests')
-rw-r--r--macaroonbakery/tests/common.py30
-rw-r--r--macaroonbakery/tests/test_agent.py331
-rw-r--r--macaroonbakery/tests/test_authorizer.py72
-rw-r--r--macaroonbakery/tests/test_bakery.py88
-rw-r--r--macaroonbakery/tests/test_checker.py443
-rw-r--r--macaroonbakery/tests/test_client.py388
-rw-r--r--macaroonbakery/tests/test_codec.py115
-rw-r--r--macaroonbakery/tests/test_discharge.py274
-rw-r--r--macaroonbakery/tests/test_discharge_all.py71
-rw-r--r--macaroonbakery/tests/test_keyring.py34
-rw-r--r--macaroonbakery/tests/test_macaroon.py69
-rw-r--r--macaroonbakery/tests/test_namespace.py2
-rw-r--r--macaroonbakery/tests/test_oven.py127
-rw-r--r--macaroonbakery/tests/test_store.py4
-rw-r--r--macaroonbakery/tests/test_time.py129
15 files changed, 1539 insertions, 638 deletions
diff --git a/macaroonbakery/tests/common.py b/macaroonbakery/tests/common.py
index 2619127..f238dfd 100644
--- a/macaroonbakery/tests/common.py
+++ b/macaroonbakery/tests/common.py
@@ -4,7 +4,7 @@ from datetime import datetime, timedelta
import pytz
-import macaroonbakery
+import macaroonbakery as bakery
import macaroonbakery.checkers as checkers
@@ -51,7 +51,7 @@ def true_check(ctx, cond, args):
return None
-class OneIdentity(macaroonbakery.IdentityClient):
+class OneIdentity(bakery.IdentityClient):
'''An IdentityClient implementation that always returns a single identity
from declared_identity, allowing allow(LOGIN_OP) to work even when there
are no declaration caveats (this is mostly to support the legacy tests
@@ -73,7 +73,7 @@ class _NoOne(object):
return ''
-class ThirdPartyStrcmpChecker(macaroonbakery.ThirdPartyCaveatChecker):
+class ThirdPartyStrcmpChecker(bakery.ThirdPartyCaveatChecker):
def __init__(self, str):
self.str = str
@@ -82,12 +82,12 @@ class ThirdPartyStrcmpChecker(macaroonbakery.ThirdPartyCaveatChecker):
if isinstance(cav_info.condition, bytes):
condition = cav_info.condition.decode('utf-8')
if condition != self.str:
- raise macaroonbakery.ThirdPartyCaveatCheckFailed(
+ raise bakery.ThirdPartyCaveatCheckFailed(
'{} doesn\'t match {}'.format(condition, self.str))
return []
-class ThirdPartyCheckerWithCaveats(macaroonbakery.ThirdPartyCaveatChecker):
+class ThirdPartyCheckerWithCaveats(bakery.ThirdPartyCaveatChecker):
def __init__(self, cavs=None):
if cavs is None:
cavs = []
@@ -97,7 +97,7 @@ class ThirdPartyCheckerWithCaveats(macaroonbakery.ThirdPartyCaveatChecker):
return self.cavs
-class ThirdPartyCaveatCheckerEmpty(macaroonbakery.ThirdPartyCaveatChecker):
+class ThirdPartyCaveatCheckerEmpty(bakery.ThirdPartyCaveatChecker):
def check_third_party_caveat(self, ctx, cav_info):
return []
@@ -107,14 +107,16 @@ def new_bakery(location, locator=None):
# key pair, and registers the key with the given locator if provided.
#
# It uses test_checker to check first party caveats.
- key = macaroonbakery.generate_key()
+ key = bakery.generate_key()
if locator is not None:
locator.add_info(location,
- macaroonbakery.ThirdPartyInfo(
+ bakery.ThirdPartyInfo(
public_key=key.public_key,
- version=macaroonbakery.LATEST_BAKERY_VERSION))
- return macaroonbakery.Bakery(key=key,
- checker=test_checker(),
- location=location,
- identity_client=OneIdentity(),
- locator=locator)
+ version=bakery.LATEST_VERSION))
+ return bakery.Bakery(
+ key=key,
+ checker=test_checker(),
+ location=location,
+ identity_client=OneIdentity(),
+ locator=locator,
+ )
diff --git a/macaroonbakery/tests/test_agent.py b/macaroonbakery/tests/test_agent.py
index 49134f5..67f5b84 100644
--- a/macaroonbakery/tests/test_agent.py
+++ b/macaroonbakery/tests/test_agent.py
@@ -1,6 +1,7 @@
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
import base64
+from datetime import datetime, timedelta
import json
import os
import tempfile
@@ -9,7 +10,17 @@ from unittest import TestCase
import nacl.encoding
import requests.cookies
import six
+from six.moves.urllib.parse import parse_qs
+from six.moves.http_cookies import SimpleCookie
+from httmock import (
+ HTTMock,
+ urlmatch,
+ response
+)
+import macaroonbakery as bakery
+import macaroonbakery.httpbakery as httpbakery
+import macaroonbakery.checkers as checkers
import macaroonbakery.httpbakery.agent as agent
@@ -62,13 +73,19 @@ class TestAgents(TestCase):
def test_load_agents_into_cookies(self):
cookies = requests.cookies.RequestsCookieJar()
- c1, key = agent.load_agent_file(self.agent_filename, cookies=cookies)
+ c1, key = agent.load_agent_file(
+ self.agent_filename,
+ cookies=cookies,
+ )
self.assertEqual(c1, cookies)
- self.assertEqual(key.encode(nacl.encoding.Base64Encoder),
- b'CqoSgj06Zcgb4/S6RT4DpTjLAfKoznEY3JsShSjKJEU=')
+ self.assertEqual(
+ key.encode(nacl.encoding.Base64Encoder),
+ b'CqoSgj06Zcgb4/S6RT4DpTjLAfKoznEY3JsShSjKJEU=',
+ )
self.assertEqual(
key.public_key.encode(nacl.encoding.Base64Encoder),
- b'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
+ b'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=',
+ )
value = cookies.get('agent-login', domain='1.example.com')
jv = base64.b64decode(value)
@@ -76,8 +93,7 @@ class TestAgents(TestCase):
jv = jv.decode('utf-8')
data = json.loads(jv)
self.assertEqual(data['username'], 'user-1')
- self.assertEqual(data['public_key'],
- 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
+ self.assertEqual(data['public_key'], 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
value = cookies.get('agent-login', domain='2.example.com',
path='/discharger')
@@ -86,8 +102,7 @@ class TestAgents(TestCase):
jv = jv.decode('utf-8')
data = json.loads(jv)
self.assertEqual(data['username'], 'user-2')
- self.assertEqual(data['public_key'],
- 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
+ self.assertEqual(data['public_key'], 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
def test_load_agents_with_bad_key(self):
with self.assertRaises(agent.AgentFileFormatError):
@@ -97,6 +112,295 @@ class TestAgents(TestCase):
with self.assertRaises(agent.AgentFileFormatError):
agent.load_agent_file(self.no_username_agent_filename)
+ def test_agent_login(self):
+ discharge_key = bakery.generate_key()
+
+ class _DischargerLocator(bakery.ThirdPartyLocator):
+ def third_party_info(self, loc):
+ if loc == 'http://0.3.2.1':
+ return bakery.ThirdPartyInfo(
+ public_key=discharge_key.public_key,
+ version=bakery.LATEST_VERSION,
+ )
+ d = _DischargerLocator()
+ server_key = bakery.generate_key()
+ server_bakery = bakery.Bakery(key=server_key, locator=d)
+
+ @urlmatch(path='.*/here')
+ def server_get(url, request):
+ ctx = checkers.AuthContext()
+ test_ops = [bakery.Op(entity='test-op', action='read')]
+ auth_checker = server_bakery.checker.auth(
+ httpbakery.extract_macaroons(request.headers))
+ try:
+ auth_checker.allow(ctx, test_ops)
+ resp = response(status_code=200,
+ content='done')
+ except bakery.PermissionDenied:
+ caveats = [
+ checkers.Caveat(location='http://0.3.2.1', condition='is-ok')
+ ]
+ m = server_bakery.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=datetime.utcnow() + timedelta(days=1),
+ caveats=caveats, ops=test_ops)
+ content, headers = httpbakery.discharge_required_response(
+ m, '/',
+ 'test',
+ 'message')
+ resp = response(status_code=401,
+ content=content,
+ headers=headers)
+ return request.hooks['response'][0](resp)
+
+ @urlmatch(path='.*/discharge')
+ def discharge(url, request):
+ qs = parse_qs(request.body)
+ if qs.get('token64') is None:
+ return response(
+ status_code=401,
+ content={
+ 'Code': httpbakery.ERR_INTERACTION_REQUIRED,
+ 'Message': 'interaction required',
+ 'Info': {
+ 'InteractionMethods': {
+ 'agent': {'login-url': '/login'},
+ },
+ },
+ },
+ headers={'Content-Type': 'application/json'})
+ else:
+ qs = parse_qs(request.body)
+ content = {q: qs[q][0] for q in qs}
+ m = httpbakery.discharge(checkers.AuthContext(), content,
+ discharge_key, None, alwaysOK3rd)
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Macaroon': m.serialize_json()
+ }
+ }
+
+ key = bakery.generate_key()
+
+ @urlmatch(path='.*/login')
+ def login(url, request):
+ b = bakery.Bakery(key=discharge_key)
+ m = b.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=datetime.utcnow() + timedelta(days=1),
+ caveats=[bakery.local_third_party_caveat(
+ key.public_key,
+ version=httpbakery.request_version(request.headers))],
+ ops=[bakery.Op(entity='agent', action='login')])
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'macaroon': m.to_dict()
+ }
+ }
+
+ with HTTMock(server_get), \
+ HTTMock(discharge), \
+ HTTMock(login):
+ client = httpbakery.Client(interaction_methods=[
+ agent.AgentInteractor(
+ agent.AuthInfo(
+ key=key,
+ agents=[
+ agent.Agent(
+ username='test-user',
+ url=u'http://0.3.2.1'
+ )
+ ],
+ ),
+ ),
+ ])
+ resp = requests.get(
+ 'http://0.1.2.3/here',
+ cookies=client.cookies,
+ auth=client.auth())
+ self.assertEquals(resp.content, b'done')
+
+ def test_agent_legacy(self):
+ discharge_key = bakery.generate_key()
+
+ class _DischargerLocator(bakery.ThirdPartyLocator):
+ def third_party_info(self, loc):
+ if loc == 'http://0.3.2.1':
+ return bakery.ThirdPartyInfo(
+ public_key=discharge_key.public_key,
+ version=bakery.LATEST_VERSION,
+ )
+ d = _DischargerLocator()
+ server_key = bakery.generate_key()
+ server_bakery = bakery.Bakery(key=server_key, locator=d)
+
+ @urlmatch(path='.*/here')
+ def server_get(url, request):
+ ctx = checkers.AuthContext()
+ test_ops = [bakery.Op(entity='test-op', action='read')]
+ auth_checker = server_bakery.checker.auth(
+ httpbakery.extract_macaroons(request.headers))
+ try:
+ auth_checker.allow(ctx, test_ops)
+ resp = response(status_code=200,
+ content='done')
+ except bakery.PermissionDenied:
+ caveats = [
+ checkers.Caveat(location='http://0.3.2.1',
+ condition='is-ok')
+ ]
+ m = server_bakery.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=datetime.utcnow() + timedelta(days=1),
+ caveats=caveats, ops=test_ops)
+ content, headers = httpbakery.discharge_required_response(
+ m, '/',
+ 'test',
+ 'message')
+ resp = response(
+ status_code=401,
+ content=content,
+ headers=headers,
+ )
+ return request.hooks['response'][0](resp)
+
+ class InfoStorage:
+ info = None
+
+ @urlmatch(path='.*/discharge')
+ def discharge(url, request):
+ qs = parse_qs(request.body)
+ if qs.get('caveat64') is not None:
+ content = {q: qs[q][0] for q in qs}
+
+ class InteractionRequiredError(Exception):
+ def __init__(self, error):
+ self.error = error
+
+ class CheckerInError(bakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(self, ctx, info):
+ InfoStorage.info = info
+ raise InteractionRequiredError(
+ httpbakery.Error(
+ code=httpbakery.ERR_INTERACTION_REQUIRED,
+ version=httpbakery.request_version(
+ request.headers),
+ message='interaction required',
+ info=httpbakery.ErrorInfo(
+ wait_url='http://0.3.2.1/wait?'
+ 'dischargeid=1',
+ visit_url='http://0.3.2.1/visit?'
+ 'dischargeid=1'
+ ),
+ ),
+ )
+ try:
+ httpbakery.discharge(
+ checkers.AuthContext(), content,
+ discharge_key, None, CheckerInError())
+ except InteractionRequiredError as exc:
+ return response(
+ status_code=401,
+ content={
+ 'Code': exc.error.code,
+ 'Message': exc.error.message,
+ 'Info': {
+ 'WaitURL': exc.error.info.wait_url,
+ 'VisitURL': exc.error.info.visit_url,
+ },
+ },
+ headers={'Content-Type': 'application/json'})
+
+ key = bakery.generate_key()
+
+ @urlmatch(path='.*/visit?$')
+ def visit(url, request):
+ if request.headers.get('Accept') == 'application/json':
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'agent': request.url
+ }
+ }
+ cs = SimpleCookie()
+ cookies = request.headers.get('Cookie')
+ if cookies is not None:
+ cs.load(str(cookies))
+ public_key = None
+ for c in cs:
+ if c == 'agent-login':
+ json_cookie = json.loads(
+ base64.b64decode(cs[c].value).decode('utf-8'))
+ public_key = bakery.PublicKey.deserialize(json_cookie.get('public_key'))
+ ms = httpbakery.extract_macaroons(request.headers)
+ if len(ms) == 0:
+ b = bakery.Bakery(key=discharge_key)
+ m = b.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=datetime.utcnow() + timedelta(days=1),
+ caveats=[bakery.local_third_party_caveat(
+ public_key,
+ version=httpbakery.request_version(request.headers))],
+ ops=[bakery.Op(entity='agent', action='login')])
+ content, headers = httpbakery.discharge_required_response(
+ m, '/',
+ 'test',
+ 'message')
+ resp = response(status_code=401,
+ content=content,
+ headers=headers)
+ return request.hooks['response'][0](resp)
+
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'agent-login': True
+ }
+ }
+
+ @urlmatch(path='.*/wait?$')
+ def wait(url, request):
+ class EmptyChecker(bakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(self, ctx, info):
+ return []
+ if InfoStorage.info is None:
+ self.fail('visit url has not been visited')
+ m = bakery.discharge(
+ checkers.AuthContext(),
+ InfoStorage.info.id,
+ InfoStorage.info.caveat,
+ discharge_key,
+ EmptyChecker(),
+ _DischargerLocator(),
+ )
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Macaroon': m.to_dict()
+ }
+ }
+
+ with HTTMock(server_get), \
+ HTTMock(discharge), \
+ HTTMock(visit), \
+ HTTMock(wait):
+ client = httpbakery.Client(interaction_methods=[
+ agent.AgentInteractor(
+ agent.AuthInfo(
+ key=key,
+ agents=[agent.Agent(username='test-user', url=u'http://0.3.2.1')],
+ ),
+ ),
+ ])
+ resp = requests.get(
+ 'http://0.1.2.3/here',
+ cookies=client.cookies,
+ auth=client.auth(),
+ )
+ self.assertEquals(resp.content, b'done')
+
agent_file = '''
{
@@ -146,3 +450,14 @@ no_username_agent_file = '''
}]
}
'''
+
+
+class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
+ def __init__(self, check):
+ self._check = check
+
+ def check_third_party_caveat(self, ctx, info):
+ cond, arg = checkers.parse_caveat(info.condition)
+ return self._check(cond, arg)
+
+alwaysOK3rd = ThirdPartyCaveatCheckerF(lambda cond, arg: [])
diff --git a/macaroonbakery/tests/test_authorizer.py b/macaroonbakery/tests/test_authorizer.py
index da01974..f90d2b5 100644
--- a/macaroonbakery/tests/test_authorizer.py
+++ b/macaroonbakery/tests/test_authorizer.py
@@ -2,7 +2,7 @@
# Licensed under the LGPLv3, see LICENCE file for details.
from unittest import TestCase
-import macaroonbakery
+import macaroonbakery as bakery
import macaroonbakery.checkers as checkers
@@ -23,11 +23,11 @@ class TestAuthorizer(TestCase):
else:
self.fail('unexpected entity: ' + op.Entity)
- ops = [macaroonbakery.Op('a', 'x'), macaroonbakery.Op('b', 'x'),
- macaroonbakery.Op('c', 'x'), macaroonbakery.Op('d', 'x')]
- allowed, caveats = macaroonbakery.AuthorizerFunc(f).authorize(
+ ops = [bakery.Op('a', 'x'), bakery.Op('b', 'x'),
+ bakery.Op('c', 'x'), bakery.Op('d', 'x')]
+ allowed, caveats = bakery.AuthorizerFunc(f).authorize(
checkers.AuthContext(),
- macaroonbakery.SimpleIdentity('bob'),
+ bakery.SimpleIdentity('bob'),
ops
)
self.assertEqual(allowed, [False, True, True, True])
@@ -40,42 +40,45 @@ class TestAuthorizer(TestCase):
ctx = checkers.AuthContext()
tests = [
('no ops, no problem',
- macaroonbakery.ACLAuthorizer(allow_public=True,
- get_acl=lambda x, y: []), None, [],
+ bakery.ACLAuthorizer(allow_public=True, get_acl=lambda x, y: []),
+ None,
+ [],
[]),
('identity that does not implement ACLIdentity; '
'user should be denied except for everyone group',
- macaroonbakery.ACLAuthorizer(allow_public=True,
- get_acl=lambda ctx, op: [
- macaroonbakery.EVERYONE]
- if op.entity == 'a' else ['alice']),
+ bakery.ACLAuthorizer(
+ allow_public=True,
+ get_acl=lambda ctx, op: [bakery.EVERYONE] if op.entity == 'a' else ['alice'],
+ ),
SimplestIdentity('bob'),
- [macaroonbakery.Op(entity='a', action='a'),
- macaroonbakery.Op(entity='b', action='b')],
+ [bakery.Op(entity='a', action='a'),
+ bakery.Op(entity='b', action='b')],
[True, False]),
('identity that does not implement ACLIdentity with user == Id; '
'user should be denied except for everyone group',
- macaroonbakery.ACLAuthorizer(allow_public=True,
- get_acl=lambda ctx, op: [
- macaroonbakery.EVERYONE] if
- op.entity == 'a' else ['bob']),
+ bakery.ACLAuthorizer(
+ allow_public=True,
+ get_acl=lambda ctx, op: [bakery.EVERYONE] if op.entity == 'a' else ['bob'],
+ ),
SimplestIdentity('bob'),
- [macaroonbakery.Op(entity='a', action='a'),
- macaroonbakery.Op(entity='b', action='b')],
+ [bakery.Op(entity='a', action='a'),
+ bakery.Op(entity='b', action='b')],
[True, False]),
('permission denied for everyone without AllowPublic',
- macaroonbakery.ACLAuthorizer(allow_public=False,
- get_acl=lambda x, y: [
- macaroonbakery.EVERYONE]),
+ bakery.ACLAuthorizer(
+ allow_public=False,
+ get_acl=lambda x, y: [bakery.EVERYONE],
+ ),
SimplestIdentity('bob'),
- [macaroonbakery.Op(entity='a', action='a')],
+ [bakery.Op(entity='a', action='a')],
[False]),
('permission granted to anyone with no identity with AllowPublic',
- macaroonbakery.ACLAuthorizer(allow_public=True,
- get_acl=lambda x, y: [
- macaroonbakery.EVERYONE]),
+ bakery.ACLAuthorizer(
+ allow_public=True,
+ get_acl=lambda x, y: [bakery.EVERYONE],
+ ),
None,
- [macaroonbakery.Op(entity='a', action='a')],
+ [bakery.Op(entity='a', action='a')],
[True])
]
for test in tests:
@@ -96,12 +99,12 @@ class TestAuthorizer(TestCase):
Visited.in_f = True
return False, None
- macaroonbakery.AuthorizerFunc(f).authorize(
- ctx, macaroonbakery.SimpleIdentity('bob'), ['op1']
+ bakery.AuthorizerFunc(f).authorize(
+ ctx, bakery.SimpleIdentity('bob'), ['op1']
)
self.assertTrue(Visited.in_f)
- class TestIdentity(SimplestIdentity, macaroonbakery.ACLIdentity):
+ class TestIdentity(SimplestIdentity, bakery.ACLIdentity):
def allow(other, ctx, acls):
self.assertEqual(ctx.get('a'), 'aval')
Visited.in_allow = True
@@ -112,14 +115,15 @@ class TestAuthorizer(TestCase):
Visited.in_get_acl = True
return []
- macaroonbakery.ACLAuthorizer(allow_public=False,
- get_acl=get_acl).authorize(
- ctx, TestIdentity('bob'), ['op1'])
+ bakery.ACLAuthorizer(
+ allow_public=False,
+ get_acl=get_acl,
+ ).authorize(ctx, TestIdentity('bob'), ['op1'])
self.assertTrue(Visited.in_get_acl)
self.assertTrue(Visited.in_allow)
-class SimplestIdentity(macaroonbakery.Identity):
+class SimplestIdentity(bakery.Identity):
# SimplestIdentity implements Identity for a string. Unlike
# SimpleIdentity, it does not implement ACLIdentity.
def __init__(self, user):
diff --git a/macaroonbakery/tests/test_bakery.py b/macaroonbakery/tests/test_bakery.py
index 724b264..5a13cff 100644
--- a/macaroonbakery/tests/test_bakery.py
+++ b/macaroonbakery/tests/test_bakery.py
@@ -14,7 +14,7 @@ from httmock import (
response
)
-from macaroonbakery import httpbakery
+import macaroonbakery.httpbakery as httpbakery
ID_PATH = 'http://example.com/someprotecteurl'
@@ -90,6 +90,32 @@ def first_407_then_200(url, request):
return request.hooks['response'][0](resp)
+@urlmatch(netloc='example.com:8000', path='.*/someprotecteurl')
+def first_407_then_200_with_port(url, request):
+ if request.headers.get('cookie', '').startswith('macaroon-'):
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Value': 'some value'
+ }
+ }
+ else:
+ resp = response(status_code=407,
+ content={
+ 'Info': {
+ 'Macaroon': json_macaroon,
+ 'MacaroonPath': '/',
+ 'CookieNameSuffix': 'test'
+ },
+ 'Message': 'verification failed: no macaroon '
+ 'cookies in request',
+ 'Code': 'macaroon discharge required'
+ },
+ headers={'Content-Type': 'application/json'},
+ request=request)
+ return request.hooks['response'][0](resp)
+
+
@urlmatch(path='.*/someprotecteurl')
def valid_200(url, request):
return {
@@ -142,25 +168,55 @@ def wait_after_401(url, request):
class TestBakery(TestCase):
+
+ def assert_cookie_security(self, cookies, name, secure):
+ for cookie in cookies:
+ if cookie.name == name:
+ assert cookie.secure == secure
+ break
+ else:
+ assert False, 'no cookie named {} found in jar'.format(name)
+
def test_discharge(self):
- jar = requests.cookies.RequestsCookieJar()
- with HTTMock(first_407_then_200):
- with HTTMock(discharge_200):
+ client = httpbakery.Client()
+ with HTTMock(first_407_then_200), HTTMock(discharge_200):
resp = requests.get(ID_PATH,
- cookies=jar,
- auth=httpbakery.BakeryAuth(cookies=jar))
+ cookies=client.cookies,
+ auth=client.auth())
resp.raise_for_status()
- assert 'macaroon-test' in jar.keys()
+ assert 'macaroon-test' in client.cookies.keys()
+ self.assert_cookie_security(client.cookies, 'macaroon-test', secure=False)
@patch('webbrowser.open')
def test_407_then_401_on_discharge(self, mock_open):
- jar = requests.cookies.RequestsCookieJar()
- with HTTMock(first_407_then_200):
- with HTTMock(discharge_401):
- with HTTMock(wait_after_401):
- resp = requests.get(ID_PATH,
- auth=httpbakery.BakeryAuth(
- cookies=jar))
- resp.raise_for_status()
+ client = httpbakery.Client()
+ with HTTMock(first_407_then_200), HTTMock(discharge_401), HTTMock(wait_after_401):
+ resp = requests.get(
+ ID_PATH,
+ cookies=client.cookies,
+ auth=client.auth(),
+ )
+ resp.raise_for_status()
mock_open.assert_called_once_with(u'http://example.com/visit', new=1)
- assert 'macaroon-test' in jar.keys()
+ assert 'macaroon-test' in client.cookies.keys()
+
+ def test_cookie_with_port(self):
+ client = httpbakery.Client()
+ with HTTMock(first_407_then_200_with_port):
+ with HTTMock(discharge_200):
+ resp = requests.get('http://example.com:8000/someprotecteurl',
+ cookies=client.cookies,
+ auth=client.auth())
+ resp.raise_for_status()
+ assert 'macaroon-test' in client.cookies.keys()
+
+ def test_secure_cookie_for_https(self):
+ client = httpbakery.Client()
+ with HTTMock(first_407_then_200_with_port), HTTMock(discharge_200):
+ resp = requests.get(
+ 'https://example.com:8000/someprotecteurl',
+ cookies=client.cookies,
+ auth=client.auth())
+ resp.raise_for_status()
+ assert 'macaroon-test' in client.cookies.keys()
+ self.assert_cookie_security(client.cookies, 'macaroon-test', secure=True)
diff --git a/macaroonbakery/tests/test_checker.py b/macaroonbakery/tests/test_checker.py
index 06bf008..643c756 100644
--- a/macaroonbakery/tests/test_checker.py
+++ b/macaroonbakery/tests/test_checker.py
@@ -9,7 +9,7 @@ from datetime import timedelta
from pymacaroons.verifier import Verifier, FirstPartyCaveatVerifierDelegate
import pymacaroons
-import macaroonbakery
+import macaroonbakery as bakery
import macaroonbakery.checkers as checkers
from macaroonbakery.tests.common import test_context, epoch, test_checker
@@ -22,13 +22,13 @@ class TestChecker(TestCase):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
- {macaroonbakery.Op(entity='something', action='read'):
- {macaroonbakery.EVERYONE}})
+ {bakery.Op(entity='something', action='read'):
+ {bakery.EVERYONE}})
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
- auth_info = client.do(test_context, ts,
- [macaroonbakery.Op(entity='something',
- action='read')])
+ auth_info = client.do(test_context, ts, [
+ bakery.Op(entity='something', action='read'),
+ ])
self.assertEqual(len(self._discharges), 0)
self.assertIsNotNone(auth_info)
self.assertIsNone(auth_info.identity)
@@ -37,25 +37,23 @@ class TestChecker(TestCase):
def test_authorization_denied(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = macaroonbakery.ClosedAuthorizer()
+ auth = bakery.ClosedAuthorizer()
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- with self.assertRaises(macaroonbakery.PermissionDenied):
- client.do(ctx, ts, [macaroonbakery.Op(entity='something',
- action='read')])
+ with self.assertRaises(bakery.PermissionDenied):
+ client.do(ctx, ts, [bakery.Op(entity='something', action='read')])
def test_authorize_with_authentication_required(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
- {macaroonbakery.Op(entity='something', action='read'): {'bob'}})
+ {bakery.Op(entity='something', action='read'): {'bob'}})
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- auth_info = client.do(ctx, ts, [macaroonbakery.Op(entity='something',
- action='read')])
+ auth_info = client.do(ctx, ts, [bakery.Op(entity='something', action='read')])
self.assertEqual(self._discharges,
[_DischargeRecord(location='ids', user='bob')])
self.assertIsNotNone(auth_info)
@@ -67,16 +65,16 @@ class TestChecker(TestCase):
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
{
- macaroonbakery.Op(entity='something', action='read'): {'bob'},
- macaroonbakery.Op(entity='otherthing', action='read'): {'bob'}
+ bakery.Op(entity='something', action='read'): {'bob'},
+ bakery.Op(entity='otherthing', action='read'): {'bob'}
}
)
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
client.do(ctx, ts, [
- macaroonbakery.Op(entity='something', action='read'),
- macaroonbakery.Op(entity='otherthing', action='read')
+ bakery.Op(entity='something', action='read'),
+ bakery.Op(entity='otherthing', action='read')
])
self.assertEqual(self._discharges,
[_DischargeRecord(location='ids', user='bob')])
@@ -85,159 +83,150 @@ class TestChecker(TestCase):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
- {macaroonbakery.Op(entity='something', action='read'): {'bob'}})
+ {bakery.Op(entity='something', action='read'): {'bob'}})
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m = client.discharged_capability(
- ctx, ts, [macaroonbakery.Op(entity='something', action='read')])
+ ctx, ts, [bakery.Op(entity='something', action='read')])
# Check that we can exercise the capability directly on the service
# with no discharging required.
- auth_info = ts.do(test_context, [m],
- [macaroonbakery.Op(entity='something',
- action='read')])
+ auth_info = ts.do(test_context, [m], [
+ bakery.Op(entity='something', action='read'),
+ ])
self.assertIsNotNone(auth_info)
self.assertIsNone(auth_info.identity)
self.assertEqual(len(auth_info.macaroons), 1)
- self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
- m[0].identifier_bytes)
+ self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, m[0].identifier_bytes)
def test_capability_multiple_entities(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'bob'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- macaroonbakery.Op(entity='e3', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'bob'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ bakery.Op(entity='e3', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m = client.discharged_capability(ctx, ts, [
- macaroonbakery.Op(entity='e1',
- action='read'),
- macaroonbakery.Op(entity='e2',
- action='read'),
- macaroonbakery.Op(entity='e3',
- action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
self.assertEqual(self._discharges,
[_DischargeRecord(location='ids', user='bob')])
# Check that we can exercise the capability directly on the service
# with no discharging required.
ts.do(test_context, [m], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read'),
- macaroonbakery.Op(entity='e3', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
# Check that we can exercise the capability to act on a subset of
# the operations.
ts.do(test_context, [m], [
- macaroonbakery.Op(entity='e2', action='read'),
- macaroonbakery.Op(entity='e3', action='read')]
- )
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
ts.do(test_context, [m],
- [macaroonbakery.Op(entity='e3', action='read')])
+ [bakery.Op(entity='e3', action='read')])
def test_multiple_capabilities(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Acquire two capabilities as different users and check
# that we can combine them together to do both operations
# at once.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- m1 = _Client(locator).discharged_capability(ctx, ts,
- [macaroonbakery.Op(
- entity='e1',
- action='read')])
+ m1 = _Client(locator).discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ ])
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m2 = _Client(locator).discharged_capability(ctx, ts,
- [macaroonbakery.Op(
+ [bakery.Op(
entity='e2',
action='read')])
- self.assertEqual(self._discharges,
- [
- _DischargeRecord(location='ids', user='alice'),
- _DischargeRecord(location='ids', user='bob'),
- ])
+ self.assertEqual(self._discharges, [
+ _DischargeRecord(location='ids', user='alice'),
+ _DischargeRecord(location='ids', user='bob'),
+ ])
auth_info = ts.do(test_context, [m1, m2], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
self.assertIsNotNone(auth_info)
self.assertIsNone(auth_info.identity)
self.assertEqual(len(auth_info.macaroons), 2)
- self.assertEqual(auth_info.macaroons[0][0].identifier_bytes,
- m1[0].identifier_bytes)
- self.assertEqual(auth_info.macaroons[1][0].identifier_bytes,
- m2[0].identifier_bytes)
+ self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, m1[0].identifier_bytes)
+ self.assertEqual(auth_info.macaroons[1][0].identifier_bytes, m2[0].identifier_bytes)
def test_combine_capabilities(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- macaroonbakery.Op(entity='e3', action='read'): {'bob',
- 'alice'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ bakery.Op(entity='e3', action='read'): {'bob', 'alice'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Acquire two capabilities as different users and check
# that we can combine them together into a single capability
# capable of both operations.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- m1 = _Client(locator).discharged_capability(
- ctx, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e3', action='read')])
+ m1 = _Client(locator).discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m2 = _Client(locator).discharged_capability(
- ctx, ts, [macaroonbakery.Op(entity='e2', action='read')])
+ ctx, ts, [bakery.Op(entity='e2', action='read')])
m = ts.capability(test_context, [m1, m2], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read'),
- macaroonbakery.Op(entity='e3', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read'),
- macaroonbakery.Op(entity='e3', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ bakery.Op(entity='e3', action='read'),
+ ])
def test_partially_authorized_request(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Acquire a capability for e1 but rely on authentication to
# authorize e2.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- m = _Client(locator).discharged_capability(ctx, ts,
- [macaroonbakery.Op(
- entity='e1',
- action='read')])
+ m = _Client(locator).discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ ])
client = _Client(locator)
client.add_macaroon(ts, 'authz', m)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- client.discharged_capability(
- ctx, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ client.discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
def test_auth_with_third_party_caveats(self):
locator = _DischargerLocator()
@@ -247,16 +236,15 @@ class TestChecker(TestCase):
# when authorizing.
def authorize_with_tp_discharge(ctx, id, op):
if (id is not None and id.id() == 'bob' and
- op == macaroonbakery.Op(entity='something',
- action='read')):
+ op == bakery.Op(entity='something', action='read')):
return True, [checkers.Caveat(condition='question',
location='other third party')]
return False, None
- auth = macaroonbakery.AuthorizerFunc(authorize_with_tp_discharge)
+ auth = bakery.AuthorizerFunc(authorize_with_tp_discharge)
ts = _Service('myservice', auth, ids, locator)
- class _LocalDischargeChecker(macaroonbakery.ThirdPartyCaveatChecker):
+ class _LocalDischargeChecker(bakery.ThirdPartyCaveatChecker):
def check_third_party_caveat(_, ctx, info):
if info.condition != 'question':
raise ValueError('third party condition not recognized')
@@ -267,29 +255,25 @@ class TestChecker(TestCase):
return []
locator['other third party'] = _Discharger(
- key=macaroonbakery.generate_key(),
+ key=bakery.generate_key(),
checker=_LocalDischargeChecker(),
locator=locator,
)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- client.do(ctx, ts, [macaroonbakery.Op(entity='something',
- action='read')])
+ client.do(ctx, ts, [bakery.Op(entity='something', action='read')])
self.assertEqual(self._discharges, [
_DischargeRecord(location='ids', user='bob'),
- _DischargeRecord(location='other third party',
- user='bob')
+ _DischargeRecord(location='other third party', user='bob')
])
def test_capability_combines_first_party_caveats(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'}
- }
- )
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Acquire two capabilities as different users, add some first party
@@ -297,12 +281,12 @@ class TestChecker(TestCase):
# capable of both operations.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
m1 = _Client(locator).capability(
- ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ ctx, ts, [bakery.Op(entity='e1', action='read')])
m1.macaroon.add_first_party_caveat('true 1')
m1.macaroon.add_first_party_caveat('true 2')
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
m2 = _Client(locator).capability(
- ctx, ts, [macaroonbakery.Op(entity='e2', action='read')])
+ ctx, ts, [bakery.Op(entity='e2', action='read')])
m2.macaroon.add_first_party_caveat('true 3')
m2.macaroon.add_first_party_caveat('true 4')
@@ -311,8 +295,9 @@ class TestChecker(TestCase):
client.add_macaroon(ts, 'authz2', [m2.macaroon])
m = client.capability(test_context, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [
'true 1',
'true 2',
@@ -323,11 +308,10 @@ class TestChecker(TestCase):
def test_first_party_caveat_squashing(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'alice'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'alice'},
+ })
ts = _Service('myservice', auth, ids, locator)
tests = [
('duplicates removed', [
@@ -366,13 +350,13 @@ class TestChecker(TestCase):
# Make a first macaroon with all the required first party caveats.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
m1 = _Client(locator).capability(
- ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ ctx, ts, [bakery.Op(entity='e1', action='read')])
m1.add_caveats(test[1], None, None)
# Make a second macaroon that's not used to check that it's
# caveats are not added.
m2 = _Client(locator).capability(
- ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ ctx, ts, [bakery.Op(entity='e1', action='read')])
m2.add_caveat(checkers.Caveat(
condition='true notused', namespace='testns'), None, None)
client = _Client(locator)
@@ -380,8 +364,7 @@ class TestChecker(TestCase):
client.add_macaroon(ts, 'authz2', [m2.macaroon])
m3 = client.capability(
- test_context, ts, [macaroonbakery.Op(entity='e1',
- action='read')])
+ test_context, ts, [bakery.Op(entity='e1', action='read')])
self.assertEqual(
_macaroon_conditions(m3.macaroon.caveats, False),
_resolve_caveats(m3.namespace, test[2]))
@@ -389,11 +372,11 @@ class TestChecker(TestCase):
def test_login_only(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = macaroonbakery.ClosedAuthorizer()
+ auth = bakery.ClosedAuthorizer()
ts = _Service('myservice', auth, ids, locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP])
self.assertIsNotNone(auth_info)
self.assertEqual(auth_info.identity.id(), 'bob')
@@ -402,18 +385,17 @@ class TestChecker(TestCase):
ids = _IdService('ids', locator, self)
auth = _OpAuthorizer(
{
- macaroonbakery.Op(entity='e1', action='read'): {'alice'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
+ bakery.Op(entity='e1', action='read'): {'alice'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
})
ts = _Service('myservice', auth, ids, locator)
# Acquire a capability for e1 but rely on authentication to
# authorize e2.
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- m = _Client(locator).discharged_capability(ctx, ts,
- [macaroonbakery.Op(
- entity='e1',
- action='read')])
+ m = _Client(locator).discharged_capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ ])
client = _Client(locator)
client.add_macaroon(ts, 'authz', m)
@@ -423,24 +405,22 @@ class TestChecker(TestCase):
with self.assertRaises(_DischargeRequiredError):
client.do_any(
ctx, ts, [
- macaroonbakery.LOGIN_OP,
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e1', action='read')
+ bakery.LOGIN_OP,
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e1', action='read')
]
)
self.assertEqual(len(self._discharges), 0)
# Log in as bob.
- _, err = client.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ _, err = client.do(ctx, ts, [bakery.LOGIN_OP])
# All the previous actions should now be allowed.
- auth_info, allowed = client.do_any(
- ctx, ts, [
- macaroonbakery.LOGIN_OP,
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e1', action='read')
- ]
- )
+ auth_info, allowed = client.do_any(ctx, ts, [
+ bakery.LOGIN_OP,
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e1', action='read'),
+ ])
self.assertEqual(auth_info.identity.id(), 'bob')
self.assertEqual(len(auth_info.macaroons), 2)
self.assertEqual(allowed, [True, True, True])
@@ -448,123 +428,121 @@ class TestChecker(TestCase):
def test_auth_with_identity_from_context(self):
locator = _DischargerLocator()
ids = _BasicAuthIdService()
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'sherlock'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'sherlock'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
# Check that we can perform the ops with basic auth in the
# context.
ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
auth_info = _Client(locator).do(
- ctx, ts, [macaroonbakery.Op(entity='e1', action='read')])
+ ctx, ts, [bakery.Op(entity='e1', action='read')])
self.assertEqual(auth_info.identity.id(), 'sherlock')
self.assertEqual(len(auth_info.macaroons), 0)
def test_auth_login_op_with_identity_from_context(self):
locator = _DischargerLocator()
ids = _BasicAuthIdService()
- ts = _Service('myservice', macaroonbakery.ClosedAuthorizer(),
- ids, locator)
+ ts = _Service('myservice', bakery.ClosedAuthorizer(), ids, locator)
# Check that we can use LoginOp
# when auth isn't granted through macaroons.
ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes')
- auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'sherlock')
self.assertEqual(len(auth_info.macaroons), 0)
def test_operation_allow_caveat(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'bob'},
- macaroonbakery.Op(entity='e1', action='write'): {'bob'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'bob'},
+ bakery.Op(entity='e1', action='write'): {'bob'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- m = client.capability(
- ctx, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e1', action='write'),
- macaroonbakery.Op(entity='e2', action='read')])
+ m = client.capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e1', action='write'),
+ bakery.Op(entity='e2', action='read'),
+ ])
# Sanity check that we can do a write.
ts.do(test_context, [[m.macaroon]],
- [macaroonbakery.Op(entity='e1', action='write')])
+ [bakery.Op(entity='e1', action='write')])
m.add_caveat(checkers.allow_caveat(['read']), None, None)
# A read operation should work.
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
# A write operation should fail
# even though the original macaroon allowed it.
with self.assertRaises(_DischargeRequiredError):
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='write')])
+ bakery.Op(entity='e1', action='write'),
+ ])
def test_operation_deny_caveat(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = _OpAuthorizer(
- {
- macaroonbakery.Op(entity='e1', action='read'): {'bob'},
- macaroonbakery.Op(entity='e1', action='write'): {'bob'},
- macaroonbakery.Op(entity='e2', action='read'): {'bob'},
- })
+ auth = _OpAuthorizer({
+ bakery.Op(entity='e1', action='read'): {'bob'},
+ bakery.Op(entity='e1', action='write'): {'bob'},
+ bakery.Op(entity='e2', action='read'): {'bob'},
+ })
ts = _Service('myservice', auth, ids, locator)
client = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- m = client.capability(
- ctx, ts, [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e1', action='write'),
- macaroonbakery.Op(entity='e2', action='read')])
+ m = client.capability(ctx, ts, [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e1', action='write'),
+ bakery.Op(entity='e2', action='read'),
+ ])
# Sanity check that we can do a write.
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='write')])
+ bakery.Op(entity='e1', action='write')])
m.add_caveat(checkers.deny_caveat(['write']), None, None)
# A read operation should work.
- ts.do(
- test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='read'),
- macaroonbakery.Op(entity='e2', action='read')])
+ ts.do(test_context, [[m.macaroon]], [
+ bakery.Op(entity='e1', action='read'),
+ bakery.Op(entity='e2', action='read'),
+ ])
# A write operation should fail
# even though the original macaroon allowed it.
with self.assertRaises(_DischargeRequiredError):
ts.do(test_context, [[m.macaroon]], [
- macaroonbakery.Op(entity='e1', action='write')])
+ bakery.Op(entity='e1', action='write')])
def test_duplicate_login_macaroons(self):
locator = _DischargerLocator()
ids = _IdService('ids', locator, self)
- auth = macaroonbakery.ClosedAuthorizer()
+ auth = bakery.ClosedAuthorizer()
ts = _Service('myservice', auth, ids, locator)
# Acquire a login macaroon for bob.
client1 = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob')
- auth_info = client1.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = client1.do(ctx, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'bob')
# Acquire a login macaroon for alice.
client2 = _Client(locator)
ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice')
- auth_info = client2.do(ctx, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = client2.do(ctx, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'alice')
# Combine the two login macaroons into one client.
@@ -576,32 +554,30 @@ class TestChecker(TestCase):
# We should authenticate as bob (because macaroons are presented
# ordered by "cookie" name)
- auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'bob')
self.assertEqual(len(auth_info.macaroons), 1)
# Try them the other way around and we should authenticate as alice.
client3 = _Client(locator)
- client3.add_macaroon(ts, '1.alice',
- client2._macaroons[ts.name()]['authn'])
- client3.add_macaroon(ts, '2.bob',
- client1._macaroons[ts.name()]['authn'])
+ client3.add_macaroon(ts, '1.alice', client2._macaroons[ts.name()]['authn'])
+ client3.add_macaroon(ts, '2.bob', client1._macaroons[ts.name()]['authn'])
- auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP])
+ auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP])
self.assertEqual(auth_info.identity.id(), 'alice')
self.assertEqual(len(auth_info.macaroons), 1)
def test_macaroon_ops_fatal_error(self):
# When we get a non-VerificationError error from the
# opstore, we don't do any more verification.
- checker = macaroonbakery.Checker(
+ checker = bakery.Checker(
macaroon_opstore=_MacaroonStoreWithError())
m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2)
- with self.assertRaises(ValueError):
- checker.auth([m]).allow(test_context, [macaroonbakery.LOGIN_OP])
+ with self.assertRaises(bakery.AuthInitError):
+ checker.auth([m]).allow(test_context, [bakery.LOGIN_OP])
-class _DischargerLocator(object):
+class _DischargerLocator(bakery.ThirdPartyLocator):
def __init__(self, dischargers=None):
if dischargers is None:
dischargers = {}
@@ -611,9 +587,9 @@ class _DischargerLocator(object):
d = self._dischargers.get(loc)
if d is None:
return None
- return macaroonbakery.ThirdPartyInfo(
+ return bakery.ThirdPartyInfo(
public_key=d._key.public_key,
- version=macaroonbakery.LATEST_BAKERY_VERSION,
+ version=bakery.LATEST_VERSION,
)
def __setitem__(self, key, item):
@@ -626,25 +602,23 @@ class _DischargerLocator(object):
return self._dischargers.get(key)
-class _IdService(macaroonbakery.IdentityClient,
- macaroonbakery.ThirdPartyCaveatChecker):
+class _IdService(bakery.IdentityClient,
+ bakery.ThirdPartyCaveatChecker):
def __init__(self, location, locator, test_class):
self._location = location
self._test = test_class
- key = macaroonbakery.generate_key()
+ key = bakery.generate_key()
self._discharger = _Discharger(key=key, checker=self, locator=locator)
locator[location] = self._discharger
def check_third_party_caveat(self, ctx, info):
if info.condition != 'is-authenticated-user':
- raise macaroonbakery.CaveatNotRecognizedError(
- 'third party condition not '
- 'recognized')
+ raise bakery.CaveatNotRecognizedError(
+ 'third party condition not recognized')
username = ctx.get(_DISCHARGE_USER_KEY, '')
if username == '':
- return macaroonbakery.ThirdPartyCaveatCheckFailed(
- 'no current user')
+ raise bakery.ThirdPartyCaveatCheckFailed('no current user')
self._test._discharges.append(
_DischargeRecord(location=self._location, user=username))
return [checkers.declared_caveat('username', username)]
@@ -656,8 +630,8 @@ class _IdService(macaroonbakery.IdentityClient,
def declared_identity(self, ctx, declared):
user = declared.get('username')
if user is None:
- raise macaroonbakery.IdentityError('no username declared')
- return macaroonbakery.SimpleIdentity(user)
+ raise bakery.IdentityError('no username declared')
+ return bakery.SimpleIdentity(user)
_DISCHARGE_USER_KEY = checkers.ContextKey('user-key')
@@ -676,13 +650,17 @@ class _Discharger(object):
self._checker = checker
def discharge(self, ctx, cav, payload):
- return macaroonbakery.discharge(ctx, key=self._key, id=cav.caveat_id,
- caveat=payload,
- checker=self._checker,
- locator=self._locator)
+ return bakery.discharge(
+ ctx,
+ key=self._key,
+ id=cav.caveat_id,
+ caveat=payload,
+ checker=self._checker,
+ locator=self._locator,
+ )
-class _OpAuthorizer(macaroonbakery.Authorizer):
+class _OpAuthorizer(bakery.Authorizer):
'''Implements bakery.Authorizer by looking the operation
up in the given map. If the username is in the associated list
or the list contains "everyone", authorization is granted.
@@ -694,7 +672,7 @@ class _OpAuthorizer(macaroonbakery.Authorizer):
self._auth = auth
def authorize(self, ctx, id, ops):
- return macaroonbakery.ACLAuthorizer(
+ return bakery.ACLAuthorizer(
allow_public=True,
get_acl=lambda ctx, op: self._auth.get(op, [])).authorize(
ctx, id, ops)
@@ -705,7 +683,7 @@ class _MacaroonStore(object):
'''
def __init__(self, key, locator):
- self._root_key_store = macaroonbakery.MemoryKeyStore()
+ self._root_key_store = bakery.MemoryKeyStore()
self._key = key
self._locator = locator
@@ -713,9 +691,9 @@ class _MacaroonStore(object):
root_key, id = self._root_key_store.root_key()
m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops}
data = json.dumps(m_id)
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key=root_key, id=data, location='',
- version=macaroonbakery.LATEST_BAKERY_VERSION,
+ version=bakery.LATEST_VERSION,
namespace=namespace)
m.add_caveats(caveats, self._key, self._locator)
return m
@@ -739,7 +717,7 @@ class _MacaroonStore(object):
ok = v.verify(macaroon=ms[0], key=root_key,
discharge_macaroons=ms[1:])
if not ok:
- raise macaroonbakery.VerificationError('invalid signature')
+ raise bakery.VerificationError('invalid signature')
conditions = []
for m in ms:
cavs = m.first_party_caveats()
@@ -747,7 +725,7 @@ class _MacaroonStore(object):
conditions.append(cav.caveat_id_bytes.decode('utf-8'))
ops = []
for op in m_id['ops']:
- ops.append(macaroonbakery.Op(entity=op[0], action=op[1]))
+ ops.append(bakery.Op(entity=op[0], action=op[1]))
return ops, conditions
@@ -761,8 +739,8 @@ class _Service(object):
def __init__(self, name, auth, idm, locator):
self._name = name
- self._store = _MacaroonStore(macaroonbakery.generate_key(), locator)
- self._checker = macaroonbakery.Checker(
+ self._store = _MacaroonStore(bakery.generate_key(), locator)
+ self._checker = bakery.Checker(
checker=test_checker(),
authorizer=auth,
identity_client=idm,
@@ -774,7 +752,7 @@ class _Service(object):
def do(self, ctx, ms, ops):
try:
authInfo = self._checker.auth(ms).allow(ctx, ops)
- except macaroonbakery.DischargeRequiredError as exc:
+ except bakery.DischargeRequiredError as exc:
self._discharge_required_error(exc)
return authInfo
@@ -784,13 +762,13 @@ class _Service(object):
try:
authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops)
return authInfo, allowed
- except macaroonbakery.DischargeRequiredError as exc:
+ except bakery.DischargeRequiredError as exc:
self._discharge_required_error(exc)
def capability(self, ctx, ms, ops):
try:
conds = self._checker.auth(ms).allow_capability(ctx, ops)
- except macaroonbakery.DischargeRequiredError as exc:
+ except bakery.DischargeRequiredError as exc:
self._discharge_required_error(exc)
m = self._store.new_macaroon(None, self._checker.namespace(), ops)
@@ -802,7 +780,7 @@ class _Service(object):
m = self._store.new_macaroon(err.cavs(), self._checker.namespace(),
err.ops())
name = 'authz'
- if len(err.ops()) == 1 and err.ops()[0] == macaroonbakery.LOGIN_OP:
+ if len(err.ops()) == 1 and err.ops()[0] == bakery.LOGIN_OP:
name = 'authn'
raise _DischargeRequiredError(name=name, m=m)
@@ -824,7 +802,7 @@ class _Client(object):
max_retries = 3
def __init__(self, dischargers):
- self._key = macaroonbakery.generate_key()
+ self._key = bakery.generate_key()
self._macaroons = {}
self._dischargers = dischargers
@@ -894,26 +872,25 @@ class _Client(object):
return ms
def _discharge_all(self, ctx, m):
- def get_discharge(ctx, cav, pay_load):
+ def get_discharge(cav, payload):
d = self._dischargers.get(cav.location)
if d is None:
raise ValueError('third party discharger '
'{} not found'.format(cav.location))
- return d.discharge(ctx, cav, pay_load)
+ return d.discharge(ctx, cav, payload)
- return macaroonbakery.discharge_all(ctx, m, get_discharge)
+ return bakery.discharge_all(m, get_discharge)
-class _BasicAuthIdService(macaroonbakery.IdentityClient):
+class _BasicAuthIdService(bakery.IdentityClient):
def identity_from_context(self, ctx):
user, pwd = _basic_auth_from_context(ctx)
if user != 'sherlock' or pwd != 'holmes':
return None, None
- return macaroonbakery.SimpleIdentity(user), None
+ return bakery.SimpleIdentity(user), None
def declared_identity(self, ctx, declared):
- raise macaroonbakery.IdentityError('no identity declarations in basic '
- 'auth id service')
+ raise bakery.IdentityError('no identity declarations in basic auth id service')
_BASIC_AUTH_KEY = checkers.ContextKey('user-key')
diff --git a/macaroonbakery/tests/test_client.py b/macaroonbakery/tests/test_client.py
new file mode 100644
index 0000000..e1a4009
--- /dev/null
+++ b/macaroonbakery/tests/test_client.py
@@ -0,0 +1,388 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+import base64
+import datetime
+import json
+from unittest import TestCase
+try:
+ from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+except ImportError:
+ from http.server import HTTPServer, BaseHTTPRequestHandler
+import threading
+
+from httmock import (
+ HTTMock,
+ urlmatch
+)
+import requests
+from six.moves.urllib.parse import parse_qs
+
+import macaroonbakery as bakery
+import macaroonbakery.httpbakery as httpbakery
+import macaroonbakery.checkers as checkers
+
+AGES = datetime.datetime.utcnow() + datetime.timedelta(days=1)
+TEST_OP = bakery.Op(entity='test', action='test')
+
+
+class TestClient(TestCase):
+ def test_single_service_first_party(self):
+ b = new_bakery('loc', None, None)
+
+ def handler(*args):
+ GetHandler(b, None, None, None, None, *args)
+ try:
+ httpd = HTTPServer(('', 0), handler)
+ thread = threading.Thread(target=httpd.serve_forever)
+ thread.start()
+ srv_macaroon = b.oven.macaroon(
+ version=bakery.LATEST_VERSION, expiry=AGES,
+ caveats=None, ops=[TEST_OP])
+ self.assertEquals(srv_macaroon.macaroon.location, 'loc')
+ client = httpbakery.Client()
+ client.cookies.set_cookie(requests.cookies.create_cookie(
+ 'macaroon-test', base64.b64encode(json.dumps([
+ srv_macaroon.to_dict().get('m')
+ ]).encode('utf-8')).decode('utf-8')
+ ))
+ resp = requests.get(
+ url='http://' + httpd.server_address[0] + ':' +
+ str(httpd.server_address[1]),
+ cookies=client.cookies, auth=client.auth())
+ resp.raise_for_status()
+ self.assertEquals(resp.text, 'done')
+ finally:
+ httpd.shutdown()
+
+ def test_single_party_with_header(self):
+ b = new_bakery('loc', None, None)
+
+ def handler(*args):
+ GetHandler(b, None, None, None, None, *args)
+ try:
+ httpd = HTTPServer(('', 0), handler)
+ thread = threading.Thread(target=httpd.serve_forever)
+ thread.start()
+ srv_macaroon = b.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=AGES, caveats=None, ops=[TEST_OP])
+ self.assertEquals(srv_macaroon.macaroon.location, 'loc')
+ headers = {
+ 'Macaroons': base64.b64encode(json.dumps([
+ srv_macaroon.to_dict().get('m')
+ ]).encode('utf-8'))
+ }
+ resp = requests.get(
+ url='http://' + httpd.server_address[0] + ':' +
+ str(httpd.server_address[1]),
+ headers=headers)
+ resp.raise_for_status()
+ self.assertEquals(resp.text, 'done')
+ finally:
+ httpd.shutdown()
+
+ def test_repeated_request_with_body(self):
+ class _DischargerLocator(bakery.ThirdPartyLocator):
+ def __init__(self):
+ self.key = bakery.generate_key()
+
+ def third_party_info(self, loc):
+ if loc == 'http://1.2.3.4':
+ return bakery.ThirdPartyInfo(
+ public_key=self.key.public_key,
+ version=bakery.LATEST_VERSION,
+ )
+
+ d = _DischargerLocator()
+ b = new_bakery('loc', d, None)
+
+ @urlmatch(path='.*/discharge')
+ def discharge(url, request):
+ qs = parse_qs(request.body)
+ content = {q: qs[q][0] for q in qs}
+ m = httpbakery.discharge(checkers.AuthContext(), content, d.key, d, alwaysOK3rd)
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Macaroon': m.to_dict()
+ }
+ }
+
+ def handler(*args):
+ GetHandler(b, 'http://1.2.3.4', None, None, None, *args)
+ try:
+ httpd = HTTPServer(('', 0), handler)
+ thread = threading.Thread(target=httpd.serve_forever)
+ thread.start()
+ client = httpbakery.Client()
+ with HTTMock(discharge):
+ resp = requests.get(
+ url='http://' + httpd.server_address[0] + ':' +
+ str(httpd.server_address[1]),
+ cookies=client.cookies,
+ auth=client.auth())
+ resp.raise_for_status()
+ self.assertEquals(resp.text, 'done')
+ finally:
+ httpd.shutdown()
+
+ def test_too_many_discharge(self):
+ class _DischargerLocator(bakery.ThirdPartyLocator):
+ def __init__(self):
+ self.key = bakery.generate_key()
+
+ def third_party_info(self, loc):
+ if loc == 'http://1.2.3.4':
+ return bakery.ThirdPartyInfo(
+ public_key=self.key.public_key,
+ version=bakery.LATEST_VERSION,
+ )
+
+ d = _DischargerLocator()
+ b = new_bakery('loc', d, None)
+
+ @urlmatch(path='.*/discharge')
+ def discharge(url, request):
+ wrong_macaroon = bakery.Macaroon(
+ root_key=b'some key', id=b'xxx',
+ location='some other location',
+ version=bakery.VERSION_0)
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Macaroon': wrong_macaroon.to_dict()
+ }
+ }
+
+ def handler(*args):
+ GetHandler(b, 'http://1.2.3.4', None, None, None, *args)
+ try:
+ httpd = HTTPServer(('', 0), handler)
+ thread = threading.Thread(target=httpd.serve_forever)
+ thread.start()
+ client = httpbakery.Client()
+ with HTTMock(discharge):
+ with self.assertRaises(httpbakery.BakeryException) as ctx:
+ requests.get(
+ url='http://' + httpd.server_address[0] + ':' +
+ str(httpd.server_address[1]),
+ cookies=client.cookies,
+ auth=client.auth())
+ self.assertEqual(ctx.exception.args[0],
+ 'too many (3) discharge requests')
+ finally:
+ httpd.shutdown()
+
+ def test_third_party_discharge_refused(self):
+ class _DischargerLocator(bakery.ThirdPartyLocator):
+ def __init__(self):
+ self.key = bakery.generate_key()
+
+ def third_party_info(self, loc):
+ if loc == 'http://1.2.3.4':
+ return bakery.ThirdPartyInfo(
+ public_key=self.key.public_key,
+ version=bakery.LATEST_VERSION,
+ )
+
+ def check(cond, arg):
+ raise bakery.ThirdPartyCaveatCheckFailed('boo! cond' + cond)
+
+ d = _DischargerLocator()
+ b = new_bakery('loc', d, None)
+
+ @urlmatch(path='.*/discharge')
+ def discharge(url, request):
+ qs = parse_qs(request.body)
+ content = {q: qs[q][0] for q in qs}
+ httpbakery.discharge(checkers.AuthContext(), content, d.key, d,
+ ThirdPartyCaveatCheckerF(check))
+
+ def handler(*args):
+ GetHandler(b, 'http://1.2.3.4', None, None, None, *args)
+ try:
+ httpd = HTTPServer(('', 0), handler)
+ thread = threading.Thread(target=httpd.serve_forever)
+ thread.start()
+ client = httpbakery.Client()
+ with HTTMock(discharge):
+ with self.assertRaises(bakery.ThirdPartyCaveatCheckFailed):
+ requests.get(
+ url='http://' + httpd.server_address[0] + ':' +
+ str(httpd.server_address[1]),
+ cookies=client.cookies,
+ auth=client.auth())
+ finally:
+ httpd.shutdown()
+
+ def test_discharge_with_interaction_required_error(self):
+ class _DischargerLocator(bakery.ThirdPartyLocator):
+ def __init__(self):
+ self.key = bakery.generate_key()
+
+ def third_party_info(self, loc):
+ if loc == 'http://1.2.3.4':
+ return bakery.ThirdPartyInfo(
+ public_key=self.key.public_key,
+ version=bakery.LATEST_VERSION,
+ )
+ d = _DischargerLocator()
+ b = new_bakery('loc', d, None)
+
+ @urlmatch(path='.*/discharge')
+ def discharge(url, request):
+ return {
+ 'status_code': 401,
+ 'content': {
+ 'Code': httpbakery.ERR_INTERACTION_REQUIRED,
+ 'Message': 'interaction required',
+ 'Info': {
+ 'WaitURL': 'http://0.1.2.3/',
+ 'VisitURL': 'http://0.1.2.3/',
+ },
+ }
+ }
+
+ def handler(*args):
+ GetHandler(b, 'http://1.2.3.4', None, None, None, *args)
+
+ try:
+ httpd = HTTPServer(('', 0), handler)
+ thread = threading.Thread(target=httpd.serve_forever)
+ thread.start()
+
+ class MyInteractor(httpbakery.LegacyInteractor):
+ def legacy_interact(self, ctx, location, visit_url):
+ raise httpbakery.InteractionError('cannot visit')
+
+ def interact(self, ctx, location, interaction_required_err):
+ pass
+
+ def kind(self):
+ return httpbakery.WEB_BROWSER_INTERACTION_KIND
+
+ client = httpbakery.Client(interaction_methods=[MyInteractor()])
+
+ with HTTMock(discharge):
+ with self.assertRaises(httpbakery.InteractionError):
+ requests.get(
+ 'http://' + httpd.server_address[0] + ':' + str(
+ httpd.server_address[1]),
+ cookies=client.cookies,
+ auth=client.auth())
+ finally:
+ httpd.shutdown()
+
+
+class GetHandler(BaseHTTPRequestHandler):
+ '''A mock HTTP server that serves a GET request'''
+ def __init__(self, bakery, auth_location, mutate_error,
+ caveats, version, *args):
+ '''
+ @param bakery used to check incoming requests and macaroons
+ for discharge-required errors.
+ @param auth_location holds the location of any 3rd party
+ authorizer. If this is not None, a 3rd party caveat will be
+ added addressed to this location.
+ @param mutate_error if non None, will be called with any
+ discharge-required error before responding to the client.
+ @param caveats called to get caveats to add to the returned
+ macaroon.
+ @param holds the version of the bakery that the
+ server will purport to serve.
+ '''
+ self._bakery = bakery
+ self._auth_location = auth_location
+ self._mutate_error = mutate_error
+ self._caveats = caveats
+ self._server_version = version
+ BaseHTTPRequestHandler.__init__(self, *args)
+
+ def do_GET(self):
+ '''do_GET implements a handler for the HTTP GET method'''
+ ctx = checkers.AuthContext()
+ auth_checker = self._bakery.checker.auth(
+ httpbakery.extract_macaroons(self.headers))
+ try:
+ auth_checker.allow(ctx, [TEST_OP])
+ except (bakery.PermissionDenied,
+ bakery.VerificationError) as exc:
+ return self._write_discharge_error(exc)
+ self.send_response(200)
+ self.end_headers()
+ content_len = int(self.headers.get('content-length', 0))
+ content = 'done'
+ if self.path != '/no-body'and content_len > 0:
+ body = self.rfile.read(content_len)
+ content = content + ' ' + body
+ self.wfile.write(content.encode('utf-8'))
+ return
+
+ def _write_discharge_error(self, exc):
+ version = httpbakery.request_version(self.headers)
+ if version < bakery.LATEST_VERSION:
+ self._server_version = version
+
+ caveats = []
+ if self._auth_location != '':
+ caveats = [
+ checkers.Caveat(location=self._auth_location,
+ condition='is-ok')
+ ]
+ if self._caveats is not None:
+ caveats.extend(self._caveats)
+
+ m = self._bakery.oven.macaroon(
+ version=bakery.LATEST_VERSION, expiry=AGES,
+ caveats=caveats, ops=[TEST_OP])
+
+ content, headers = httpbakery.discharge_required_response(
+ m, '/', 'test', exc.args[0])
+ self.send_response(401)
+ for h in headers:
+ self.send_header(h, headers[h])
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ self.wfile.write(content)
+
+
+def new_bakery(location, locator, checker):
+ '''Return a new bakery instance.
+ @param location Location of the bakery {str}.
+ @param locator Locator for third parties {ThirdPartyLocator or None}
+ @param checker Caveat checker {FirstPartyCaveatChecker or None}
+ @return {Bakery}
+ '''
+ if checker is None:
+ c = checkers.Checker()
+ c.namespace().register('testns', '')
+ c.register('is', 'testns', check_is_something)
+ checker = c
+ key = bakery.generate_key()
+ return bakery.Bakery(
+ location=location,
+ locator=locator,
+ key=key,
+ checker=checker,
+ )
+
+
+def is_something_caveat():
+ return checkers.Caveat(condition='is something', namespace='testns')
+
+
+def check_is_something(ctx, cond, arg):
+ if arg != 'something':
+ return '{} doesn\'t match "something"'.format(arg)
+ return None
+
+
+class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
+ def __init__(self, check):
+ self._check = check
+
+ def check_third_party_caveat(self, ctx, info):
+ cond, arg = checkers.parse_caveat(info.condition)
+ return self._check(cond, arg)
+
+alwaysOK3rd = ThirdPartyCaveatCheckerF(lambda cond, arg: [])
diff --git a/macaroonbakery/tests/test_codec.py b/macaroonbakery/tests/test_codec.py
index 6573266..d4fbc57 100644
--- a/macaroonbakery/tests/test_codec.py
+++ b/macaroonbakery/tests/test_codec.py
@@ -6,92 +6,94 @@ from unittest import TestCase
import nacl.public
import six
-import macaroonbakery
-from macaroonbakery import utils
+import macaroonbakery as bakery
from macaroonbakery import codec
import macaroonbakery.checkers as checkers
class TestCodec(TestCase):
def setUp(self):
- self.fp_key = macaroonbakery.generate_key()
- self.tp_key = macaroonbakery.generate_key()
+ self.fp_key = bakery.generate_key()
+ self.tp_key = bakery.generate_key()
def test_v1_round_trip(self):
- tp_info = macaroonbakery.ThirdPartyInfo(
- version=macaroonbakery.BAKERY_V1,
+ tp_info = bakery.ThirdPartyInfo(
+ version=bakery.VERSION_1,
public_key=self.tp_key.public_key)
- cid = macaroonbakery.encode_caveat(
+ cid = bakery.encode_caveat(
'is-authenticated-user',
b'a random string',
tp_info,
self.fp_key,
None)
- res = macaroonbakery.decode_caveat(self.tp_key, cid)
- self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo(
+ res = bakery.decode_caveat(self.tp_key, cid)
+ self.assertEquals(res, bakery.ThirdPartyCaveatInfo(
first_party_public_key=self.fp_key.public_key,
root_key=b'a random string',
condition='is-authenticated-user',
caveat=cid,
third_party_key_pair=self.tp_key,
- version=macaroonbakery.BAKERY_V1,
- namespace=macaroonbakery.legacy_namespace()
+ version=bakery.VERSION_1,
+ id=None,
+ namespace=bakery.legacy_namespace()
))
def test_v2_round_trip(self):
- tp_info = macaroonbakery.ThirdPartyInfo(
- version=macaroonbakery.BAKERY_V2,
+ tp_info = bakery.ThirdPartyInfo(
+ version=bakery.VERSION_2,
public_key=self.tp_key.public_key)
- cid = macaroonbakery.encode_caveat(
+ cid = bakery.encode_caveat(
'is-authenticated-user',
b'a random string',
tp_info,
self.fp_key,
None)
- res = macaroonbakery.decode_caveat(self.tp_key, cid)
- self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo(
+ res = bakery.decode_caveat(self.tp_key, cid)
+ self.assertEquals(res, bakery.ThirdPartyCaveatInfo(
first_party_public_key=self.fp_key.public_key,
root_key=b'a random string',
condition='is-authenticated-user',
caveat=cid,
third_party_key_pair=self.tp_key,
- version=macaroonbakery.BAKERY_V2,
- namespace=macaroonbakery.legacy_namespace()
+ version=bakery.VERSION_2,
+ id=None,
+ namespace=bakery.legacy_namespace()
))
def test_v3_round_trip(self):
- tp_info = macaroonbakery.ThirdPartyInfo(
- version=macaroonbakery.BAKERY_V3,
+ tp_info = bakery.ThirdPartyInfo(
+ version=bakery.VERSION_3,
public_key=self.tp_key.public_key)
ns = checkers.Namespace()
ns.register('testns', 'x')
- cid = macaroonbakery.encode_caveat(
+ cid = bakery.encode_caveat(
'is-authenticated-user',
b'a random string',
tp_info,
self.fp_key,
ns)
- res = macaroonbakery.decode_caveat(self.tp_key, cid)
- self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo(
+ res = bakery.decode_caveat(self.tp_key, cid)
+ self.assertEquals(res, bakery.ThirdPartyCaveatInfo(
first_party_public_key=self.fp_key.public_key,
root_key=b'a random string',
condition='is-authenticated-user',
caveat=cid,
third_party_key_pair=self.tp_key,
- version=macaroonbakery.BAKERY_V3,
+ version=bakery.VERSION_3,
+ id=None,
namespace=ns
))
def test_empty_caveat_id(self):
- with self.assertRaises(macaroonbakery.VerificationError) as context:
- macaroonbakery.decode_caveat(self.tp_key, b'')
+ with self.assertRaises(bakery.VerificationError) as context:
+ bakery.decode_caveat(self.tp_key, b'')
self.assertTrue('empty third party caveat' in str(context.exception))
def test_decode_caveat_v1_from_go(self):
- tp_key = macaroonbakery.PrivateKey(
+ tp_key = bakery.PrivateKey(
nacl.public.PrivateKey(base64.b64decode(
'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')))
- fp_key = macaroonbakery.PrivateKey(
+ fp_key = bakery.PrivateKey(
nacl.public.PrivateKey(base64.b64decode(
'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')))
root_key = base64.b64decode('vDxEmWZEkgiNEFlJ+8ruXe3qDSLf1H+o')
@@ -108,67 +110,70 @@ class TestCodec(TestCase):
'BORldUUExGdjVla1dWUjA4Uk1sbGJhc3c4VGdFbkhzM0laeVo'
'0V2lEOHhRUWdjU3ljOHY4eUt4dEhxejVEczJOYmh1ZDJhUFdt'
'UTVMcVlNWitmZ2FNaTAxdE9DIn0=')
- cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav)
- self.assertEquals(cav, macaroonbakery.ThirdPartyCaveatInfo(
+ cav = bakery.decode_caveat(tp_key, encrypted_cav)
+ self.assertEquals(cav, bakery.ThirdPartyCaveatInfo(
condition='caveat condition',
first_party_public_key=fp_key.public_key,
third_party_key_pair=tp_key,
root_key=root_key,
caveat=encrypted_cav,
- version=macaroonbakery.BAKERY_V1,
- namespace=macaroonbakery.legacy_namespace()
+ version=bakery.VERSION_1,
+ id=None,
+ namespace=bakery.legacy_namespace()
))
def test_decode_caveat_v2_from_go(self):
- tp_key = macaroonbakery.PrivateKey(nacl.public.PrivateKey(
+ tp_key = bakery.PrivateKey(nacl.public.PrivateKey(
base64.b64decode(
'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')))
- fp_key = macaroonbakery.PrivateKey(
+ fp_key = bakery.PrivateKey(
nacl.public.PrivateKey(base64.b64decode(
'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')))
root_key = base64.b64decode('wh0HSM65wWHOIxoGjgJJOFvQKn2jJFhC')
# This caveat has been generated from the go code
# to check the compatibilty
- encrypted_cav = base64.urlsafe_b64decode(
- utils.add_base64_padding(six.b(
- 'AvD-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGIHq9xGcHS9IZ'
- 'Lh0cL6D9qpeKI0mXmCPfnwRQDuVYC8y5gVWd-oCGZaj5TGtk3byp2Vnw6ojmt'
- 'sULDhY59YA_J_Y0ATkERO5T9ajoRWBxU2OXBoX6bImXA')))
- cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav)
- self.assertEqual(cav, macaroonbakery.ThirdPartyCaveatInfo(
+ encrypted_cav = bakery.b64decode(
+ 'AvD-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGIHq9xGcHS9IZ'
+ 'Lh0cL6D9qpeKI0mXmCPfnwRQDuVYC8y5gVWd-oCGZaj5TGtk3byp2Vnw6ojmt'
+ 'sULDhY59YA_J_Y0ATkERO5T9ajoRWBxU2OXBoX6bImXA',
+ )
+ cav = bakery.decode_caveat(tp_key, encrypted_cav)
+ self.assertEqual(cav, bakery.ThirdPartyCaveatInfo(
condition='third party condition',
first_party_public_key=fp_key.public_key,
third_party_key_pair=tp_key,
root_key=root_key,
caveat=encrypted_cav,
- version=macaroonbakery.BAKERY_V2,
- namespace=macaroonbakery.legacy_namespace()
+ version=bakery.VERSION_2,
+ id=None,
+ namespace=bakery.legacy_namespace()
))
def test_decode_caveat_v3_from_go(self):
- tp_key = macaroonbakery.PrivateKey(
+ tp_key = bakery.PrivateKey(
nacl.public.PrivateKey(base64.b64decode(
'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0=')))
- fp_key = macaroonbakery.PrivateKey(nacl.public.PrivateKey(
+ fp_key = bakery.PrivateKey(nacl.public.PrivateKey(
base64.b64decode(
'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs=')))
root_key = base64.b64decode(b'oqOXI3/Mz/pKjCuFOt2eYxb7ndLq66GY')
# This caveat has been generated from the go code
# to check the compatibilty
- encrypted_cav = base64.urlsafe_b64decode(
- utils.add_base64_padding(six.b(
- 'A_D-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGNLeFSkD2M-8A'
- 'EYvmgVH95GWu7T7caKxKhhOQFcEKgnXKJvYXxz1zin4cZc4Q6C7gVqA-J4_j3'
- '1LX4VKxymqG62UGPo78wOv0_fKjr3OI6PPJOYOQgBMclemlRF2')))
- cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav)
- self.assertEquals(cav, macaroonbakery.ThirdPartyCaveatInfo(
+ encrypted_cav = bakery.b64decode(
+ 'A_D-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGNLeFSkD2M-8A'
+ 'EYvmgVH95GWu7T7caKxKhhOQFcEKgnXKJvYXxz1zin4cZc4Q6C7gVqA-J4_j3'
+ '1LX4VKxymqG62UGPo78wOv0_fKjr3OI6PPJOYOQgBMclemlRF2',
+ )
+ cav = bakery.decode_caveat(tp_key, encrypted_cav)
+ self.assertEquals(cav, bakery.ThirdPartyCaveatInfo(
condition='third party condition',
first_party_public_key=fp_key.public_key,
third_party_key_pair=tp_key,
root_key=root_key,
caveat=encrypted_cav,
- version=macaroonbakery.BAKERY_V3,
- namespace=macaroonbakery.legacy_namespace()
+ version=bakery.VERSION_3,
+ id=None,
+ namespace=bakery.legacy_namespace()
))
def test_encode_decode_varint(self):
@@ -183,7 +188,7 @@ class TestCodec(TestCase):
for test in tests:
data = bytearray()
expected = bytearray()
- macaroonbakery.encode_uvarint(test[0], data)
+ bakery.encode_uvarint(test[0], data)
for v in test[1]:
expected.append(v)
self.assertEquals(data, expected)
diff --git a/macaroonbakery/tests/test_discharge.py b/macaroonbakery/tests/test_discharge.py
index 6e2df6a..433483a 100644
--- a/macaroonbakery/tests/test_discharge.py
+++ b/macaroonbakery/tests/test_discharge.py
@@ -3,11 +3,8 @@
import unittest
from pymacaroons import MACAROON_V1, Macaroon
-from pymacaroons.exceptions import (
- MacaroonInvalidSignatureException, MacaroonUnmetCaveatException
-)
-import macaroonbakery
+import macaroonbakery as bakery
import macaroonbakery.checkers as checkers
from macaroonbakery.tests import common
@@ -20,15 +17,15 @@ class TestDischarge(unittest.TestCase):
can verify this macaroon as valid.
'''
oc = common.new_bakery('bakerytest')
- primary = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ primary = oc.oven.macaroon(bakery.LATEST_VERSION,
common.ages, None,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
self.assertEqual(primary.macaroon.location, 'bakerytest')
primary.add_caveat(checkers.Caveat(condition='str something',
namespace='testns'),
oc.oven.key, oc.oven.locator)
oc.checker.auth([[primary.macaroon]]).allow(
- common.str_context('something'), [macaroonbakery.LOGIN_OP])
+ common.str_context('something'), [bakery.LOGIN_OP])
def test_macaroon_paper_fig6(self):
''' Implements an example flow as described in the macaroons paper:
@@ -45,15 +42,15 @@ class TestDischarge(unittest.TestCase):
The target service verifies the original macaroon it delegated to fs
No direct contact between bs and ts is required
'''
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
ts = common.new_bakery('ts-loc', locator)
fs = common.new_bakery('fs-loc', locator)
# ts creates a macaroon.
- ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION,
common.ages,
- None, [macaroonbakery.LOGIN_OP])
+ None, [bakery.LOGIN_OP])
# ts somehow sends the macaroon to fs which adds a third party caveat
# to be discharged by bs.
@@ -62,51 +59,55 @@ class TestDischarge(unittest.TestCase):
fs.oven.key, fs.oven.locator)
# client asks for a discharge macaroon for each third party caveat
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
self.assertEqual(cav.location, 'bs-loc')
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- bs.oven.key,
- common.ThirdPartyStrcmpChecker(
- 'user==bob'),
- bs.oven.locator)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker('user==bob'),
+ bs.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
- get_discharge)
+ d = bakery.discharge_all(ts_macaroon, get_discharge)
ts.checker.auth([d]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
def test_discharge_with_version1_macaroon(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
ts = common.new_bakery('ts-loc', locator)
# ts creates a old-version macaroon.
- ts_macaroon = ts.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages,
- None, [macaroonbakery.LOGIN_OP])
+ ts_macaroon = ts.oven.macaroon(bakery.VERSION_1, common.ages,
+ None, [bakery.LOGIN_OP])
ts_macaroon.add_caveat(checkers.Caveat(condition='something',
location='bs-loc'),
ts.oven.key, ts.oven.locator)
# client asks for a discharge macaroon for each third party caveat
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
# Make sure that the caveat id really is old-style.
try:
cav.caveat_id_bytes.decode('utf-8')
except UnicodeDecodeError:
self.fail('caveat id is not utf-8')
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- bs.oven.key,
- common.ThirdPartyStrcmpChecker(
- 'something'),
- bs.oven.locator)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker('something'),
+ bs.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
- get_discharge)
+ d = bakery.discharge_all(ts_macaroon, get_discharge)
ts.checker.auth([d]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
for m in d:
self.assertEqual(m.version, MACAROON_V1)
@@ -114,29 +115,31 @@ class TestDischarge(unittest.TestCase):
def test_version1_macaroon_id(self):
# In the version 1 bakery, macaroon ids were hex-encoded with a
# hyphenated UUID suffix.
- root_key_store = macaroonbakery.MemoryKeyStore()
- b = macaroonbakery.Bakery(root_key_store=root_key_store,
- identity_client=common.OneIdentity())
+ root_key_store = bakery.MemoryKeyStore()
+ b = bakery.Bakery(
+ root_key_store=root_key_store,
+ identity_client=common.OneIdentity(),
+ )
key, id = root_key_store.root_key()
root_key_store.get(id)
m = Macaroon(key=key, version=MACAROON_V1, location='',
identifier=id + b'-deadl00f')
b.checker.auth([[m]]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
def test_macaroon_paper_fig6_fails_without_discharges(self):
''' Runs a similar test as test_macaroon_paper_fig6 without the client
discharging the third party caveats.
'''
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
ts = common.new_bakery('ts-loc', locator)
fs = common.new_bakery('fs-loc', locator)
common.new_bakery('as-loc', locator)
# ts creates a macaroon.
- ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION,
common.ages, None,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
# ts somehow sends the macaroon to fs which adds a third party
# caveat to be discharged by as.
@@ -148,24 +151,24 @@ class TestDischarge(unittest.TestCase):
try:
ts.checker.auth([[ts_macaroon.macaroon]]).allow(
common.test_context,
- macaroonbakery.LOGIN_OP
+ bakery.LOGIN_OP
)
self.fail('macaroon unmet should be raised')
- except MacaroonUnmetCaveatException:
+ except bakery.VerificationError:
pass
def test_macaroon_paper_fig6_fails_with_binding_on_tampered_sig(self):
''' Runs a similar test as test_macaroon_paper_fig6 with the discharge
macaroon binding being done on a tampered signature.
'''
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
ts = common.new_bakery('ts-loc', locator)
# ts creates a macaroon.
- ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION,
common.ages, None,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
# ts somehow sends the macaroon to fs which adds a third party caveat
# to be discharged by as.
ts_macaroon.add_caveat(checkers.Caveat(condition='user==bob',
@@ -173,16 +176,18 @@ class TestDischarge(unittest.TestCase):
ts.oven.key, ts.oven.locator)
# client asks for a discharge macaroon for each third party caveat
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
self.assertEqual(cav.location, 'bs-loc')
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- bs.oven.key,
- common.ThirdPartyStrcmpChecker(
- 'user==bob'),
- bs.oven.locator)
-
- d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
- get_discharge)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ bs.oven.key,
+ common.ThirdPartyStrcmpChecker('user==bob'),
+ bs.oven.locator,
+ )
+
+ d = bakery.discharge_all(ts_macaroon, get_discharge)
# client has all the discharge macaroons. For each discharge macaroon
# bind it to our ts_macaroon and add it to our request.
tampered_macaroon = Macaroon()
@@ -190,35 +195,39 @@ class TestDischarge(unittest.TestCase):
d[i + 1] = tampered_macaroon.prepare_for_request(dm)
# client makes request to ts.
- with self.assertRaises(MacaroonInvalidSignatureException) as exc:
+ with self.assertRaises(bakery.VerificationError) as exc:
ts.checker.auth([d]).allow(common.test_context,
- macaroonbakery.LOGIN_OP)
- self.assertEqual('Signatures do not match', exc.exception.args[0])
+ bakery.LOGIN_OP)
+ self.assertEqual('verification failed: Signatures do not match',
+ exc.exception.args[0])
def test_need_declared(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
first_party = common.new_bakery('first', locator)
third_party = common.new_bakery('third', locator)
# firstParty mints a macaroon with a third-party caveat addressed
# to thirdParty with a need-declared caveat.
m = first_party.oven.macaroon(
- macaroonbakery.LATEST_BAKERY_VERSION, common.ages, [
+ bakery.LATEST_VERSION, common.ages, [
checkers.need_declared_caveat(
checkers.Caveat(location='third', condition='something'),
['foo', 'bar']
)
- ], [macaroonbakery.LOGIN_OP])
+ ], [bakery.LOGIN_OP])
# The client asks for a discharge macaroon for each third party caveat.
- def get_discharge(ctx, cav, payload):
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- third_party.oven.key,
- common.ThirdPartyStrcmpChecker(
- 'something'),
- third_party.oven.locator)
+ def get_discharge(cav, payload):
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
+ common.ThirdPartyStrcmpChecker('something'),
+ third_party.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
# The required declared attributes should have been added
# to the discharge macaroons.
@@ -231,22 +240,26 @@ class TestDischarge(unittest.TestCase):
# Make sure the macaroons actually check out correctly
# when provided with the declared checker.
ctx = checkers.context_with_declared(common.test_context, declared)
- first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+ first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP])
# Try again when the third party does add a required declaration.
# The client asks for a discharge macaroon for each third party caveat.
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
checker = common.ThirdPartyCheckerWithCaveats([
checkers.declared_caveat('foo', 'a'),
checkers.declared_caveat('arble', 'b')
])
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- third_party.oven.key,
- checker,
- third_party.oven.locator)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
+ checker,
+ third_party.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
# One attribute should have been added, the other was already there.
declared = checkers.infer_declared(d, first_party.checker.namespace())
@@ -257,25 +270,28 @@ class TestDischarge(unittest.TestCase):
})
ctx = checkers.context_with_declared(common.test_context, declared)
- first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+ first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP])
# Try again, but this time pretend a client is sneakily trying
# to add another 'declared' attribute to alter the declarations.
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
checker = common.ThirdPartyCheckerWithCaveats([
checkers.declared_caveat('foo', 'a'),
checkers.declared_caveat('arble', 'b'),
])
# Sneaky client adds a first party caveat.
- m = macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- third_party.oven.key, checker,
- third_party.oven.locator)
+ m = bakery.discharge(
+ common.test_context, cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key, checker,
+ third_party.oven.locator,
+ )
m.add_caveat(checkers.declared_caveat('foo', 'c'), None, None)
return m
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
declared = checkers.infer_declared(d, first_party.checker.namespace())
self.assertEqual(declared, {
@@ -283,22 +299,22 @@ class TestDischarge(unittest.TestCase):
'arble': 'b',
})
- with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ with self.assertRaises(bakery.AuthInitError) as exc:
first_party.checker.auth([d]).allow(common.test_context,
- macaroonbakery.LOGIN_OP)
+ bakery.LOGIN_OP)
self.assertEqual('cannot authorize login macaroon: caveat '
'"declared foo a" not satisfied: got foo=null, '
'expected "a"', exc.exception.args[0])
def test_discharge_two_need_declared(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
first_party = common.new_bakery('first', locator)
third_party = common.new_bakery('third', locator)
# first_party mints a macaroon with two third party caveats
# with overlapping attributes.
m = first_party.oven.macaroon(
- macaroonbakery.LATEST_BAKERY_VERSION,
+ bakery.LATEST_VERSION,
common.ages, [
checkers.need_declared_caveat(
checkers.Caveat(location='third', condition='x'),
@@ -306,18 +322,22 @@ class TestDischarge(unittest.TestCase):
checkers.need_declared_caveat(
checkers.Caveat(location='third', condition='y'),
['bar', 'baz']),
- ], [macaroonbakery.LOGIN_OP])
+ ], [bakery.LOGIN_OP])
# The client asks for a discharge macaroon for each third party caveat.
# Since no declarations are added by the discharger,
- def get_discharge(ctx, cav, payload):
- return macaroonbakery.discharge(
- ctx, cav.caveat_id_bytes, payload, third_party.oven.key,
+ def get_discharge(cav, payload):
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
common.ThirdPartyCaveatCheckerEmpty(),
- third_party.oven.locator)
+ third_party.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
declared = checkers.infer_declared(d, first_party.checker.namespace())
self.assertEqual(declared, {
'foo': '',
@@ -325,12 +345,12 @@ class TestDischarge(unittest.TestCase):
'baz': '',
})
ctx = checkers.context_with_declared(common.test_context, declared)
- first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP])
+ first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP])
# If they return conflicting values, the discharge fails.
# The client asks for a discharge macaroon for each third party caveat.
# Since no declarations are added by the discharger,
- class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
def check_third_party_caveat(self, ctx, cav_info):
if cav_info.condition == b'x':
return [checkers.declared_caveat('foo', 'fooval1')]
@@ -341,62 +361,70 @@ class TestDischarge(unittest.TestCase):
]
raise common.ThirdPartyCaveatCheckFailed('not matched')
- def get_discharge(ctx, cav, payload):
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- third_party.oven.key,
- ThirdPartyCaveatCheckerF(),
- third_party.oven.locator)
+ def get_discharge(cav, payload):
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
+ ThirdPartyCaveatCheckerF(),
+ third_party.oven.locator,
+ )
- d = macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ d = bakery.discharge_all(m, get_discharge)
declared = checkers.infer_declared(d, first_party.checker.namespace())
self.assertEqual(declared, {
'bar': '',
'baz': 'bazval',
})
- with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ with self.assertRaises(bakery.AuthInitError) as exc:
first_party.checker.auth([d]).allow(common.test_context,
- macaroonbakery.LOGIN_OP)
+ bakery.LOGIN_OP)
self.assertEqual('cannot authorize login macaroon: caveat "declared '
'foo fooval1" not satisfied: got foo=null, expected '
'"fooval1"', exc.exception.args[0])
def test_discharge_macaroon_cannot_be_used_as_normal_macaroon(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
first_party = common.new_bakery('first', locator)
third_party = common.new_bakery('third', locator)
# First party mints a macaroon with a 3rd party caveat.
- m = first_party.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ m = first_party.oven.macaroon(bakery.LATEST_VERSION,
common.ages, [
checkers.Caveat(location='third',
condition='true')],
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
# Acquire the discharge macaroon, but don't bind it to the original.
class M:
unbound = None
- def get_discharge(ctx, cav, payload):
- m = macaroonbakery.discharge(
- ctx, cav.caveat_id_bytes, payload, third_party.oven.key,
+ def get_discharge(cav, payload):
+ m = bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ third_party.oven.key,
common.ThirdPartyStrcmpChecker('true'),
- third_party.oven.locator)
+ third_party.oven.locator,
+ )
M.unbound = m.macaroon.copy()
return m
- macaroonbakery.discharge_all(common.test_context, m, get_discharge)
+ bakery.discharge_all(m, get_discharge)
self.assertIsNotNone(M.unbound)
# Make sure it cannot be used as a normal macaroon in the third party.
- with self.assertRaises(macaroonbakery.AuthInitError) as exc:
+ with self.assertRaises(bakery.VerificationError) as exc:
third_party.checker.auth([[M.unbound]]).allow(
- common.test_context, [macaroonbakery.LOGIN_OP])
+ common.test_context, [bakery.LOGIN_OP])
self.assertEqual('no operations found in macaroon',
exc.exception.args[0])
def test_third_party_discharge_macaroon_ids_are_small(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bakeries = {
'ts-loc': common.new_bakery('ts-loc', locator),
'as1-loc': common.new_bakery('as1-loc', locator),
@@ -404,14 +432,14 @@ class TestDischarge(unittest.TestCase):
}
ts = bakeries['ts-loc']
- ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION,
+ ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION,
common.ages,
- None, [macaroonbakery.LOGIN_OP])
+ None, [bakery.LOGIN_OP])
ts_macaroon.add_caveat(checkers.Caveat(condition='something',
location='as1-loc'),
ts.oven.key, ts.oven.locator)
- class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
def __init__(self, loc):
self._loc = loc
@@ -424,18 +452,20 @@ class TestDischarge(unittest.TestCase):
raise common.ThirdPartyCaveatCheckFailed(
'unknown location {}'.format(self._loc))
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
oven = bakeries[cav.location].oven
- return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload,
- oven.key,
- ThirdPartyCaveatCheckerF(
- cav.location),
- oven.locator)
-
- d = macaroonbakery.discharge_all(common.test_context, ts_macaroon,
- get_discharge)
+ return bakery.discharge(
+ common.test_context,
+ cav.caveat_id_bytes,
+ payload,
+ oven.key,
+ ThirdPartyCaveatCheckerF(cav.location),
+ oven.locator,
+ )
+
+ d = bakery.discharge_all(ts_macaroon, get_discharge)
ts.checker.auth([d]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
for i, m in enumerate(d):
for j, cav in enumerate(m.caveats):
diff --git a/macaroonbakery/tests/test_discharge_all.py b/macaroonbakery/tests/test_discharge_all.py
index 8da8823..7999f5f 100644
--- a/macaroonbakery/tests/test_discharge_all.py
+++ b/macaroonbakery/tests/test_discharge_all.py
@@ -4,7 +4,7 @@ import unittest
from pymacaroons.verifier import Verifier
-import macaroonbakery
+import macaroonbakery as bakery
import macaroonbakery.checkers as checkers
from macaroonbakery.tests import common
@@ -16,12 +16,11 @@ def always_ok(predicate):
class TestDischargeAll(unittest.TestCase):
def test_discharge_all_no_discharges(self):
root_key = b'root key'
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key=root_key, id=b'id0', location='loc0',
- version=macaroonbakery.LATEST_BAKERY_VERSION,
+ version=bakery.LATEST_VERSION,
namespace=common.test_checker().namespace())
- ms = macaroonbakery.discharge_all(
- common.test_context, m, no_discharge(self))
+ ms = bakery.discharge_all(m, no_discharge(self))
self.assertEqual(len(ms), 1)
self.assertEqual(ms[0], m.macaroon)
v = Verifier()
@@ -30,9 +29,9 @@ class TestDischargeAll(unittest.TestCase):
def test_discharge_all_many_discharges(self):
root_key = b'root key'
- m0 = macaroonbakery.Macaroon(
+ m0 = bakery.Macaroon(
root_key=root_key, id=b'id0', location='loc0',
- version=macaroonbakery.LATEST_BAKERY_VERSION)
+ version=bakery.LATEST_VERSION)
class State(object):
total_required = 40
@@ -52,19 +51,18 @@ class TestDischargeAll(unittest.TestCase):
add_caveats(m0)
- def get_discharge(_, cav, payload):
+ def get_discharge(cav, payload):
self.assertEqual(payload, None)
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key='root key {}'.format(
cav.caveat_id.decode('utf-8')).encode('utf-8'),
id=cav.caveat_id, location='',
- version=macaroonbakery.LATEST_BAKERY_VERSION)
+ version=bakery.LATEST_VERSION)
add_caveats(m)
return m
- ms = macaroonbakery.discharge_all(
- common.test_context, m0, get_discharge)
+ ms = bakery.discharge_all(m0, get_discharge)
self.assertEqual(len(ms), 41)
@@ -77,7 +75,7 @@ class TestDischargeAll(unittest.TestCase):
# we're using actual third party caveats as added by
# Macaroon.add_caveat and we use a larger number of caveats
# so that caveat ids will need to get larger.
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bakeries = {}
total_discharges_required = 40
@@ -106,9 +104,9 @@ class TestDischargeAll(unittest.TestCase):
return caveats
root_key = b'root key'
- m0 = macaroonbakery.Macaroon(
+ m0 = bakery.Macaroon(
root_key=root_key, id=b'id0', location='ts-loc',
- version=macaroonbakery.LATEST_BAKERY_VERSION)
+ version=bakery.LATEST_VERSION)
m0.add_caveat(checkers. Caveat(location=add_bakery(),
condition='something'),
@@ -117,18 +115,17 @@ class TestDischargeAll(unittest.TestCase):
# We've added a caveat (the first) so one less caveat is required.
M.still_required -= 1
- class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker):
+ class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
def check_third_party_caveat(self, ctx, info):
return checker(ctx, info)
- def get_discharge(ctx, cav, payload):
- return macaroonbakery.discharge(
- ctx, cav.caveat_id, payload,
+ def get_discharge(cav, payload):
+ return bakery.discharge(
+ common.test_context, cav.caveat_id, payload,
bakeries[cav.location].oven.key,
ThirdPartyCaveatCheckerF(), locator)
- ms = macaroonbakery.discharge_all(common.test_context, m0,
- get_discharge)
+ ms = bakery.discharge_all(m0, get_discharge)
self.assertEqual(len(ms), total_discharges_required + 1)
@@ -138,33 +135,31 @@ class TestDischargeAll(unittest.TestCase):
def test_discharge_all_local_discharge(self):
oc = common.new_bakery('ts', None)
- client_key = macaroonbakery.generate_key()
- m = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, common.ages,
+ client_key = bakery.generate_key()
+ m = oc.oven.macaroon(bakery.LATEST_VERSION, common.ages,
[
- macaroonbakery.local_third_party_caveat(
+ bakery.local_third_party_caveat(
client_key.public_key,
- macaroonbakery.LATEST_BAKERY_VERSION)
- ], [macaroonbakery.LOGIN_OP])
- ms = macaroonbakery.discharge_all(
- common.test_context, m, no_discharge(self), client_key)
+ bakery.LATEST_VERSION)
+ ], [bakery.LOGIN_OP])
+ ms = bakery.discharge_all(m, no_discharge(self), client_key)
oc.checker.auth([ms]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
def test_discharge_all_local_discharge_version1(self):
oc = common.new_bakery('ts', None)
- client_key = macaroonbakery.generate_key()
- m = oc.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages, [
- macaroonbakery.local_third_party_caveat(
- client_key.public_key, macaroonbakery.BAKERY_V1)
- ], [macaroonbakery.LOGIN_OP])
- ms = macaroonbakery.discharge_all(
- common.test_context, m, no_discharge(self), client_key)
+ client_key = bakery.generate_key()
+ m = oc.oven.macaroon(bakery.VERSION_1, common.ages, [
+ bakery.local_third_party_caveat(
+ client_key.public_key, bakery.VERSION_1)
+ ], [bakery.LOGIN_OP])
+ ms = bakery.discharge_all(m, no_discharge(self), client_key)
oc.checker.auth([ms]).allow(common.test_context,
- [macaroonbakery.LOGIN_OP])
+ [bakery.LOGIN_OP])
def no_discharge(test):
- def get_discharge(ctx, cav, payload):
+ def get_discharge(cav, payload):
test.fail("get_discharge called unexpectedly")
return get_discharge
diff --git a/macaroonbakery/tests/test_keyring.py b/macaroonbakery/tests/test_keyring.py
index 351b144..438ab1b 100644
--- a/macaroonbakery/tests/test_keyring.py
+++ b/macaroonbakery/tests/test_keyring.py
@@ -4,28 +4,28 @@ import unittest
from httmock import urlmatch, HTTMock
-import macaroonbakery
-from macaroonbakery import httpbakery
+import macaroonbakery as bakery
+import macaroonbakery.httpbakery as httpbakery
class TestKeyRing(unittest.TestCase):
def test_cache_fetch(self):
- key = macaroonbakery.generate_key()
+ key = bakery.generate_key()
@urlmatch(path='.*/discharge/info')
def discharge_info(url, request):
return {
'status_code': 200,
'content': {
- 'Version': macaroonbakery.LATEST_BAKERY_VERSION,
+ 'Version': bakery.LATEST_VERSION,
'PublicKey': key.public_key.encode().decode('utf-8')
}
}
- expectInfo = macaroonbakery.ThirdPartyInfo(
+ expectInfo = bakery.ThirdPartyInfo(
public_key=key.public_key,
- version=macaroonbakery.LATEST_BAKERY_VERSION
+ version=bakery.LATEST_VERSION
)
kr = httpbakery.ThirdPartyLocator(allow_insecure=True)
with HTTMock(discharge_info):
@@ -33,21 +33,21 @@ class TestKeyRing(unittest.TestCase):
self.assertEqual(info, expectInfo)
def test_cache_norefetch(self):
- key = macaroonbakery.generate_key()
+ key = bakery.generate_key()
@urlmatch(path='.*/discharge/info')
def discharge_info(url, request):
return {
'status_code': 200,
'content': {
- 'Version': macaroonbakery.LATEST_BAKERY_VERSION,
+ 'Version': bakery.LATEST_VERSION,
'PublicKey': key.public_key.encode().decode('utf-8')
}
}
- expectInfo = macaroonbakery.ThirdPartyInfo(
+ expectInfo = bakery.ThirdPartyInfo(
public_key=key.public_key,
- version=macaroonbakery.LATEST_BAKERY_VERSION
+ version=bakery.LATEST_VERSION
)
kr = httpbakery.ThirdPartyLocator(allow_insecure=True)
with HTTMock(discharge_info):
@@ -57,7 +57,7 @@ class TestKeyRing(unittest.TestCase):
self.assertEqual(info, expectInfo)
def test_cache_fetch_no_version(self):
- key = macaroonbakery.generate_key()
+ key = bakery.generate_key()
@urlmatch(path='.*/discharge/info')
def discharge_info(url, request):
@@ -68,9 +68,9 @@ class TestKeyRing(unittest.TestCase):
}
}
- expectInfo = macaroonbakery.ThirdPartyInfo(
+ expectInfo = bakery.ThirdPartyInfo(
public_key=key.public_key,
- version=macaroonbakery.BAKERY_V1
+ version=bakery.VERSION_1
)
kr = httpbakery.ThirdPartyLocator(allow_insecure=True)
with HTTMock(discharge_info):
@@ -79,11 +79,11 @@ class TestKeyRing(unittest.TestCase):
def test_allow_insecure(self):
kr = httpbakery.ThirdPartyLocator()
- with self.assertRaises(macaroonbakery.error.ThirdPartyInfoNotFound):
+ with self.assertRaises(bakery.error.ThirdPartyInfoNotFound):
kr.third_party_info('http://0.1.2.3/')
def test_fallback(self):
- key = macaroonbakery.generate_key()
+ key = bakery.generate_key()
@urlmatch(path='.*/discharge/info')
def discharge_info(url, request):
@@ -100,9 +100,9 @@ class TestKeyRing(unittest.TestCase):
}
}
- expectInfo = macaroonbakery.ThirdPartyInfo(
+ expectInfo = bakery.ThirdPartyInfo(
public_key=key.public_key,
- version=macaroonbakery.BAKERY_V1
+ version=bakery.VERSION_1
)
kr = httpbakery.ThirdPartyLocator(allow_insecure=True)
with HTTMock(discharge_info):
diff --git a/macaroonbakery/tests/test_macaroon.py b/macaroonbakery/tests/test_macaroon.py
index 7e77e2b..93bbbb8 100644
--- a/macaroonbakery/tests/test_macaroon.py
+++ b/macaroonbakery/tests/test_macaroon.py
@@ -7,36 +7,35 @@ import six
import pymacaroons
from pymacaroons import serializers
-import macaroonbakery
+import macaroonbakery as bakery
import macaroonbakery.checkers as checkers
from macaroonbakery.tests import common
class TestMacaroon(TestCase):
def test_new_macaroon(self):
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
b'rootkey',
b'some id',
'here',
- macaroonbakery.LATEST_BAKERY_VERSION)
+ bakery.LATEST_VERSION)
self.assertIsNotNone(m)
self.assertEquals(m._macaroon.identifier, b'some id')
self.assertEquals(m._macaroon.location, 'here')
- self.assertEquals(m.version, macaroonbakery.LATEST_BAKERY_VERSION)
+ self.assertEquals(m.version, bakery.LATEST_VERSION)
def test_add_first_party_caveat(self):
- m = macaroonbakery.Macaroon('rootkey', 'some id', 'here',
- macaroonbakery.LATEST_BAKERY_VERSION)
+ m = bakery.Macaroon('rootkey', 'some id', 'here', bakery.LATEST_VERSION)
m.add_caveat(checkers.Caveat('test_condition'))
caveats = m.first_party_caveats()
self.assertEquals(len(caveats), 1)
self.assertEquals(caveats[0].caveat_id, b'test_condition')
def test_add_third_party_caveat(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
- lbv = six.int2byte(macaroonbakery.LATEST_BAKERY_VERSION)
+ lbv = six.int2byte(bakery.LATEST_VERSION)
tests = [
('no existing id', b'', [], lbv + six.int2byte(0)),
('several existing ids', b'', [
@@ -53,10 +52,10 @@ class TestMacaroon(TestCase):
for test in tests:
print('test ', test[0])
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key=b'root key', id=b'id',
location='location',
- version=macaroonbakery.LATEST_BAKERY_VERSION)
+ version=bakery.LATEST_VERSION)
for id in test[2]:
m.macaroon.add_third_party_caveat(key=None, key_id=id,
location='')
@@ -68,21 +67,21 @@ class TestMacaroon(TestCase):
test[3])
def test_marshal_json_latest_version(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
ns = checkers.Namespace({
'testns': 'x',
'otherns': 'y',
})
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key=b'root key', id=b'id',
location='location',
- version=macaroonbakery.LATEST_BAKERY_VERSION,
+ version=bakery.LATEST_VERSION,
namespace=ns)
m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'),
bs.oven.key, locator)
data = m.serialize_json()
- m1 = macaroonbakery.Macaroon.deserialize_json(data)
+ m1 = bakery.Macaroon.deserialize_json(data)
# Just check the signature and version - we're not interested in fully
# checking the macaroon marshaling here.
self.assertEqual(m1.macaroon.signature, m.macaroon.signature)
@@ -92,8 +91,8 @@ class TestMacaroon(TestCase):
self.assertEqual(m1._caveat_data, m._caveat_data)
# test with the encoder, decoder
- data = json.dumps(m, cls=macaroonbakery.MacaroonJSONEncoder)
- m1 = json.loads(data, cls=macaroonbakery.MacaroonJSONDecoder)
+ data = json.dumps(m, cls=bakery.MacaroonJSONEncoder)
+ m1 = json.loads(data, cls=bakery.MacaroonJSONDecoder)
self.assertEqual(m1.macaroon.signature, m.macaroon.signature)
self.assertEqual(m1.macaroon.version, m.macaroon.version)
self.assertEqual(len(m1.macaroon.caveats), 1)
@@ -101,20 +100,20 @@ class TestMacaroon(TestCase):
self.assertEqual(m1._caveat_data, m._caveat_data)
def test_json_version1(self):
- self._test_json_with_version(macaroonbakery.BAKERY_V1)
+ self._test_json_with_version(bakery.VERSION_1)
def test_json_version2(self):
- self._test_json_with_version(macaroonbakery.BAKERY_V2)
+ self._test_json_with_version(bakery.VERSION_2)
def _test_json_with_version(self, version):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery('bs-loc', locator)
ns = checkers.Namespace({
'testns': 'x',
})
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key=b'root key', id=b'id',
location='location', version=version,
namespace=ns)
@@ -124,18 +123,18 @@ class TestMacaroon(TestCase):
# Sanity check that no external caveat data has been added.
self.assertEqual(len(m._caveat_data), 0)
- data = json.dumps(m, cls=macaroonbakery.MacaroonJSONEncoder)
- m1 = json.loads(data, cls=macaroonbakery.MacaroonJSONDecoder)
+ data = json.dumps(m, cls=bakery.MacaroonJSONEncoder)
+ m1 = json.loads(data, cls=bakery.MacaroonJSONDecoder)
# Just check the signature and version - we're not interested in fully
# checking the macaroon marshaling here.
self.assertEqual(m1.macaroon.signature, m.macaroon.signature)
self.assertEqual(m1.macaroon.version,
- macaroonbakery.macaroon_version(version))
+ bakery.macaroon_version(version))
self.assertEqual(len(m1.macaroon.caveats), 1)
# Namespace information has been thrown away.
- self.assertEqual(m1.namespace, macaroonbakery.legacy_namespace())
+ self.assertEqual(m1.namespace, bakery.legacy_namespace())
self.assertEqual(len(m1._caveat_data), 0)
@@ -144,9 +143,9 @@ class TestMacaroon(TestCase):
with self.assertRaises(ValueError) as exc:
json.loads(json.dumps({
'm': m.serialize(serializer=serializers.JsonSerializer()),
- 'v': macaroonbakery.LATEST_BAKERY_VERSION + 1
- }), cls=macaroonbakery.MacaroonJSONDecoder)
- self.assertEqual('unknow bakery version 4', exc.exception.args[0])
+ 'v': bakery.LATEST_VERSION + 1
+ }), cls=bakery.MacaroonJSONDecoder)
+ self.assertEqual('unknown bakery version 4', exc.exception.args[0])
def test_json_inconsistent_version(self):
m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V1)
@@ -154,21 +153,21 @@ class TestMacaroon(TestCase):
json.loads(json.dumps({
'm': json.loads(m.serialize(
serializer=serializers.JsonSerializer())),
- 'v': macaroonbakery.LATEST_BAKERY_VERSION
- }), cls=macaroonbakery.MacaroonJSONDecoder)
+ 'v': bakery.LATEST_VERSION
+ }), cls=bakery.MacaroonJSONDecoder)
self.assertEqual('underlying macaroon has inconsistent version; '
'got 1 want 2', exc.exception.args[0])
def test_clone(self):
- locator = macaroonbakery.ThirdPartyStore()
+ locator = bakery.ThirdPartyStore()
bs = common.new_bakery("bs-loc", locator)
ns = checkers.Namespace({
"testns": "x",
})
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key=b'root key', id=b'id',
location='location',
- version=macaroonbakery.LATEST_BAKERY_VERSION,
+ version=bakery.LATEST_VERSION,
namespace=ns)
m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'),
bs.oven.key, locator)
@@ -185,15 +184,15 @@ class TestMacaroon(TestCase):
def test_json_deserialize_from_go(self):
ns = checkers.Namespace()
ns.register("someuri", "x")
- m = macaroonbakery.Macaroon(
+ m = bakery.Macaroon(
root_key=b'rootkey', id=b'some id', location='here',
- version=macaroonbakery.LATEST_BAKERY_VERSION, namespace=ns)
+ version=bakery.LATEST_VERSION, namespace=ns)
m.add_caveat(checkers.Caveat(condition='something',
namespace='someuri'))
data = '{"m":{"c":[{"i":"x:something"}],"l":"here","i":"some id",' \
'"s64":"c8edRIupArSrY-WZfa62pgZFD8VjDgqho9U2PlADe-E"},"v":3,' \
'"ns":"someuri:x"}'
- m_go = macaroonbakery.Macaroon.deserialize_json(data)
+ m_go = bakery.Macaroon.deserialize_json(data)
self.assertEqual(m.macaroon.signature_bytes,
m_go.macaroon.signature_bytes)
diff --git a/macaroonbakery/tests/test_namespace.py b/macaroonbakery/tests/test_namespace.py
index 2f04bb3..8a821e5 100644
--- a/macaroonbakery/tests/test_namespace.py
+++ b/macaroonbakery/tests/test_namespace.py
@@ -31,6 +31,8 @@ class TestNamespace(TestCase):
ns1 = checkers.deserialize_namespace(data)
self.assertEquals(ns1, ns)
+ # TODO(rogpeppe) add resolve tests
+
def test_register(self):
ns = checkers.Namespace(None)
ns.register('testns', 't')
diff --git a/macaroonbakery/tests/test_oven.py b/macaroonbakery/tests/test_oven.py
index 2976e94..ae235de 100644
--- a/macaroonbakery/tests/test_oven.py
+++ b/macaroonbakery/tests/test_oven.py
@@ -5,7 +5,7 @@ from unittest import TestCase
import copy
from datetime import datetime, timedelta
-import macaroonbakery
+import macaroonbakery as bakery
EPOCH = datetime(1900, 11, 17, 19, 00, 13, 0, None)
AGES = EPOCH + timedelta(days=10)
@@ -15,111 +15,110 @@ class TestOven(TestCase):
def test_canonical_ops(self):
canonical_ops_tests = (
('empty array', [], []),
- ('one element', [macaroonbakery.Op('a', 'a')],
- [macaroonbakery.Op('a', 'a')]),
+ ('one element', [bakery.Op('a', 'a')],
+ [bakery.Op('a', 'a')]),
('all in order',
- [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'),
- macaroonbakery.Op('c', 'c')],
- [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'),
- macaroonbakery.Op('c', 'c')]),
+ [bakery.Op('a', 'a'), bakery.Op('a', 'b'),
+ bakery.Op('c', 'c')],
+ [bakery.Op('a', 'a'), bakery.Op('a', 'b'),
+ bakery.Op('c', 'c')]),
('out of order',
- [macaroonbakery.Op('c', 'c'), macaroonbakery.Op('a', 'b'),
- macaroonbakery.Op('a', 'a')],
- [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'),
- macaroonbakery.Op('c', 'c')]),
+ [bakery.Op('c', 'c'), bakery.Op('a', 'b'),
+ bakery.Op('a', 'a')],
+ [bakery.Op('a', 'a'), bakery.Op('a', 'b'),
+ bakery.Op('c', 'c')]),
('with duplicates',
- [macaroonbakery.Op('c', 'c'), macaroonbakery.Op('a', 'b'),
- macaroonbakery.Op('a', 'a'), macaroonbakery.Op('c', 'a'),
- macaroonbakery.Op('c', 'b'), macaroonbakery.Op('c', 'c'),
- macaroonbakery.Op('a', 'a')],
- [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'),
- macaroonbakery.Op('c', 'a'), macaroonbakery.Op('c', 'b'),
- macaroonbakery.Op('c', 'c')]),
+ [bakery.Op('c', 'c'), bakery.Op('a', 'b'),
+ bakery.Op('a', 'a'), bakery.Op('c', 'a'),
+ bakery.Op('c', 'b'), bakery.Op('c', 'c'),
+ bakery.Op('a', 'a')],
+ [bakery.Op('a', 'a'), bakery.Op('a', 'b'),
+ bakery.Op('c', 'a'), bakery.Op('c', 'b'),
+ bakery.Op('c', 'c')]),
('make sure we\'ve got the fields right',
- [macaroonbakery.Op(entity='read', action='two'),
- macaroonbakery.Op(entity='read', action='one'),
- macaroonbakery.Op(entity='write', action='one')],
- [macaroonbakery.Op(entity='read', action='one'),
- macaroonbakery.Op(entity='read', action='two'),
- macaroonbakery.Op(entity='write', action='one')])
+ [bakery.Op(entity='read', action='two'),
+ bakery.Op(entity='read', action='one'),
+ bakery.Op(entity='write', action='one')],
+ [bakery.Op(entity='read', action='one'),
+ bakery.Op(entity='read', action='two'),
+ bakery.Op(entity='write', action='one')])
)
for about, ops, expected in canonical_ops_tests:
new_ops = copy.copy(ops)
- canonical_ops = macaroonbakery.canonical_ops(new_ops)
+ canonical_ops = bakery.canonical_ops(new_ops)
self.assertEquals(canonical_ops, expected)
# Verify that the original array isn't changed.
self.assertEquals(new_ops, ops)
def test_multiple_ops(self):
- test_oven = macaroonbakery.Oven(
- ops_store=macaroonbakery.MemoryOpsStore())
- ops = [macaroonbakery.Op('one', 'read'),
- macaroonbakery.Op('one', 'write'),
- macaroonbakery.Op('two', 'read')]
- m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES,
+ test_oven = bakery.Oven(
+ ops_store=bakery.MemoryOpsStore())
+ ops = [bakery.Op('one', 'read'),
+ bakery.Op('one', 'write'),
+ bakery.Op('two', 'read')]
+ m = test_oven.macaroon(bakery.LATEST_VERSION, AGES,
None, ops)
got_ops, conds = test_oven.macaroon_ops([m.macaroon])
self.assertEquals(len(conds), 1) # time-before caveat.
- self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops)
+ self.assertEquals(bakery.canonical_ops(got_ops), ops)
def test_multiple_ops_in_id(self):
- test_oven = macaroonbakery.Oven()
- ops = [macaroonbakery.Op('one', 'read'),
- macaroonbakery.Op('one', 'write'),
- macaroonbakery.Op('two', 'read')]
- m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES,
+ test_oven = bakery.Oven()
+ ops = [bakery.Op('one', 'read'),
+ bakery.Op('one', 'write'),
+ bakery.Op('two', 'read')]
+ m = test_oven.macaroon(bakery.LATEST_VERSION, AGES,
None, ops)
got_ops, conds = test_oven.macaroon_ops([m.macaroon])
self.assertEquals(len(conds), 1) # time-before caveat.
- self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops)
+ self.assertEquals(bakery.canonical_ops(got_ops), ops)
def test_multiple_ops_in_id_with_version1(self):
- test_oven = macaroonbakery.Oven()
- ops = [macaroonbakery.Op('one', 'read'),
- macaroonbakery.Op('one', 'write'),
- macaroonbakery.Op('two', 'read')]
- m = test_oven.macaroon(macaroonbakery.BAKERY_V1, AGES, None, ops)
+ test_oven = bakery.Oven()
+ ops = [bakery.Op('one', 'read'),
+ bakery.Op('one', 'write'),
+ bakery.Op('two', 'read')]
+ m = test_oven.macaroon(bakery.VERSION_1, AGES, None, ops)
got_ops, conds = test_oven.macaroon_ops([m.macaroon])
self.assertEquals(len(conds), 1) # time-before caveat.
- self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops)
+ self.assertEquals(bakery.canonical_ops(got_ops), ops)
def test_huge_number_of_ops_gives_small_macaroon(self):
- test_oven = macaroonbakery.Oven(
- ops_store=macaroonbakery.MemoryOpsStore())
+ test_oven = bakery.Oven(
+ ops_store=bakery.MemoryOpsStore())
ops = []
for i in range(30000):
- ops.append(macaroonbakery.Op(entity='entity{}'.format(i),
- action='action{}'.format(i)))
+ ops.append(bakery.Op(entity='entity' + str(i), action='action' + str(i)))
- m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES,
+ m = test_oven.macaroon(bakery.LATEST_VERSION, AGES,
None, ops)
got_ops, conds = test_oven.macaroon_ops([m.macaroon])
self.assertEquals(len(conds), 1) # time-before caveat.
- self.assertEquals(macaroonbakery.canonical_ops(got_ops),
- macaroonbakery.canonical_ops(ops))
+ self.assertEquals(bakery.canonical_ops(got_ops),
+ bakery.canonical_ops(ops))
data = m.serialize_json()
self.assertLess(len(data), 300)
def test_ops_stored_only_once(self):
- st = macaroonbakery.MemoryOpsStore()
- test_oven = macaroonbakery.Oven(ops_store=st)
+ st = bakery.MemoryOpsStore()
+ test_oven = bakery.Oven(ops_store=st)
- ops = [macaroonbakery.Op('one', 'read'),
- macaroonbakery.Op('one', 'write'),
- macaroonbakery.Op('two', 'read')]
+ ops = [bakery.Op('one', 'read'),
+ bakery.Op('one', 'write'),
+ bakery.Op('two', 'read')]
- m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES,
+ m = test_oven.macaroon(bakery.LATEST_VERSION, AGES,
None, ops)
got_ops, conds = test_oven.macaroon_ops([m.macaroon])
- self.assertEquals(macaroonbakery.canonical_ops(got_ops),
- macaroonbakery.canonical_ops(ops))
+ self.assertEquals(bakery.canonical_ops(got_ops),
+ bakery.canonical_ops(ops))
# Make another macaroon containing the same ops in a different order.
- ops = [macaroonbakery.Op('one', 'write'),
- macaroonbakery.Op('one', 'read'),
- macaroonbakery.Op('one', 'read'),
- macaroonbakery.Op('two', 'read')]
- test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, None,
+ ops = [bakery.Op('one', 'write'),
+ bakery.Op('one', 'read'),
+ bakery.Op('one', 'read'),
+ bakery.Op('two', 'read')]
+ test_oven.macaroon(bakery.LATEST_VERSION, AGES, None,
ops)
self.assertEquals(len(st._store), 1)
diff --git a/macaroonbakery/tests/test_store.py b/macaroonbakery/tests/test_store.py
index 7bcc4c2..5afa7be 100644
--- a/macaroonbakery/tests/test_store.py
+++ b/macaroonbakery/tests/test_store.py
@@ -2,12 +2,12 @@
# Licensed under the LGPLv3, see LICENCE file for details.
from unittest import TestCase
-import macaroonbakery
+import macaroonbakery as bakery
class TestOven(TestCase):
def test_mem_store(self):
- st = macaroonbakery.MemoryKeyStore()
+ st = bakery.MemoryKeyStore()
key, id = st.root_key()
self.assertEqual(len(key), 24)
diff --git a/macaroonbakery/tests/test_time.py b/macaroonbakery/tests/test_time.py
new file mode 100644
index 0000000..38826e1
--- /dev/null
+++ b/macaroonbakery/tests/test_time.py
@@ -0,0 +1,129 @@
+# Copyright 2017 Canonical Ltd.
+# Licensed under the LGPLv3, see LICENCE file for details.
+from datetime import timedelta
+from unittest import TestCase
+from collections import namedtuple
+
+import pyrfc3339
+import pymacaroons
+from pymacaroons import Macaroon
+
+import macaroonbakery.checkers as checkers
+
+t1 = pyrfc3339.parse('2017-10-26T16:19:47.441402074Z')
+t2 = t1 + timedelta(hours=1)
+t3 = t2 + timedelta(hours=1)
+
+
+def fpcaveat(s):
+ return pymacaroons.Caveat(caveat_id=s.encode('utf-8'))
+
+
+class TestExpireTime(TestCase):
+ def test_expire_time(self):
+ ExpireTest = namedtuple('ExpireTest', 'about caveats expectTime')
+ tests = [
+ ExpireTest(
+ about='no caveats',
+ caveats=[],
+ expectTime=None,
+ ),
+ ExpireTest(
+ about='single time-before caveat',
+ caveats=[
+ fpcaveat(checkers.time_before_caveat(t1).condition),
+ ],
+ expectTime=t1,
+ ),
+ ExpireTest(
+ about='multiple time-before caveat',
+ caveats=[
+ fpcaveat(checkers.time_before_caveat(t2).condition),
+ fpcaveat(checkers.time_before_caveat(t1).condition),
+ ],
+ expectTime=t1,
+ ),
+ ExpireTest(
+ about='mixed caveats',
+ caveats=[
+ fpcaveat(checkers.time_before_caveat(t1).condition),
+ fpcaveat('allow bar'),
+ fpcaveat(checkers.time_before_caveat(t2).condition),
+ fpcaveat('deny foo'),
+ ],
+ expectTime=t1,
+ ),
+ ExpireTest(
+ about='mixed caveats',
+ caveats=[
+ fpcaveat(checkers.COND_TIME_BEFORE + ' tomorrow'),
+ ],
+ expectTime=None,
+ ),
+ ]
+ for test in tests:
+ print('test ', test.about)
+ t = checkers.expiry_time(checkers.Namespace(), test.caveats)
+ self.assertEqual(t, test.expectTime)
+
+ def test_macaroons_expire_time(self):
+ ExpireTest = namedtuple('ExpireTest', 'about macaroons expectTime')
+ tests = [
+ ExpireTest(
+ about='no macaroons',
+ macaroons=[newMacaroon()],
+ expectTime=None,
+ ),
+ ExpireTest(
+ about='single macaroon without caveats',
+ macaroons=[newMacaroon()],
+ expectTime=None,
+ ),
+ ExpireTest(
+ about='multiple macaroon without caveats',
+ macaroons=[newMacaroon()],
+ expectTime=None,
+ ),
+ ExpireTest(
+ about='single macaroon with time-before caveat',
+ macaroons=[
+ newMacaroon([checkers.time_before_caveat(t1).condition]),
+ ],
+ expectTime=t1,
+ ),
+ ExpireTest(
+ about='single macaroon with multiple time-before caveats',
+ macaroons=[
+ newMacaroon([
+ checkers.time_before_caveat(t2).condition,
+ checkers.time_before_caveat(t1).condition,
+ ]),
+ ],
+ expectTime=t1,
+ ),
+ ExpireTest(
+ about='multiple macaroons with multiple time-before caveats',
+ macaroons=[
+ newMacaroon([
+ checkers.time_before_caveat(t3).condition,
+ checkers.time_before_caveat(t1).condition,
+ ]),
+ newMacaroon([
+ checkers.time_before_caveat(t3).condition,
+ checkers.time_before_caveat(t1).condition,
+ ]),
+ ],
+ expectTime=t1,
+ ),
+ ]
+ for test in tests:
+ print('test ', test.about)
+ t = checkers.macaroons_expiry_time(checkers.Namespace(), test.macaroons)
+ self.assertEqual(t, test.expectTime)
+
+
+def newMacaroon(conds=[]):
+ m = Macaroon(key='key', version=2)
+ for cond in conds:
+ m.add_first_party_caveat(cond)
+ return m