diff options
Diffstat (limited to 'macaroonbakery/tests')
-rw-r--r-- | macaroonbakery/tests/common.py | 30 | ||||
-rw-r--r-- | macaroonbakery/tests/test_agent.py | 331 | ||||
-rw-r--r-- | macaroonbakery/tests/test_authorizer.py | 72 | ||||
-rw-r--r-- | macaroonbakery/tests/test_bakery.py | 88 | ||||
-rw-r--r-- | macaroonbakery/tests/test_checker.py | 443 | ||||
-rw-r--r-- | macaroonbakery/tests/test_client.py | 388 | ||||
-rw-r--r-- | macaroonbakery/tests/test_codec.py | 115 | ||||
-rw-r--r-- | macaroonbakery/tests/test_discharge.py | 274 | ||||
-rw-r--r-- | macaroonbakery/tests/test_discharge_all.py | 71 | ||||
-rw-r--r-- | macaroonbakery/tests/test_keyring.py | 34 | ||||
-rw-r--r-- | macaroonbakery/tests/test_macaroon.py | 69 | ||||
-rw-r--r-- | macaroonbakery/tests/test_namespace.py | 2 | ||||
-rw-r--r-- | macaroonbakery/tests/test_oven.py | 127 | ||||
-rw-r--r-- | macaroonbakery/tests/test_store.py | 4 | ||||
-rw-r--r-- | macaroonbakery/tests/test_time.py | 129 |
15 files changed, 1539 insertions, 638 deletions
diff --git a/macaroonbakery/tests/common.py b/macaroonbakery/tests/common.py index 2619127..f238dfd 100644 --- a/macaroonbakery/tests/common.py +++ b/macaroonbakery/tests/common.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta import pytz -import macaroonbakery +import macaroonbakery as bakery import macaroonbakery.checkers as checkers @@ -51,7 +51,7 @@ def true_check(ctx, cond, args): return None -class OneIdentity(macaroonbakery.IdentityClient): +class OneIdentity(bakery.IdentityClient): '''An IdentityClient implementation that always returns a single identity from declared_identity, allowing allow(LOGIN_OP) to work even when there are no declaration caveats (this is mostly to support the legacy tests @@ -73,7 +73,7 @@ class _NoOne(object): return '' -class ThirdPartyStrcmpChecker(macaroonbakery.ThirdPartyCaveatChecker): +class ThirdPartyStrcmpChecker(bakery.ThirdPartyCaveatChecker): def __init__(self, str): self.str = str @@ -82,12 +82,12 @@ class ThirdPartyStrcmpChecker(macaroonbakery.ThirdPartyCaveatChecker): if isinstance(cav_info.condition, bytes): condition = cav_info.condition.decode('utf-8') if condition != self.str: - raise macaroonbakery.ThirdPartyCaveatCheckFailed( + raise bakery.ThirdPartyCaveatCheckFailed( '{} doesn\'t match {}'.format(condition, self.str)) return [] -class ThirdPartyCheckerWithCaveats(macaroonbakery.ThirdPartyCaveatChecker): +class ThirdPartyCheckerWithCaveats(bakery.ThirdPartyCaveatChecker): def __init__(self, cavs=None): if cavs is None: cavs = [] @@ -97,7 +97,7 @@ class ThirdPartyCheckerWithCaveats(macaroonbakery.ThirdPartyCaveatChecker): return self.cavs -class ThirdPartyCaveatCheckerEmpty(macaroonbakery.ThirdPartyCaveatChecker): +class ThirdPartyCaveatCheckerEmpty(bakery.ThirdPartyCaveatChecker): def check_third_party_caveat(self, ctx, cav_info): return [] @@ -107,14 +107,16 @@ def new_bakery(location, locator=None): # key pair, and registers the key with the given locator if provided. # # It uses test_checker to check first party caveats. - key = macaroonbakery.generate_key() + key = bakery.generate_key() if locator is not None: locator.add_info(location, - macaroonbakery.ThirdPartyInfo( + bakery.ThirdPartyInfo( public_key=key.public_key, - version=macaroonbakery.LATEST_BAKERY_VERSION)) - return macaroonbakery.Bakery(key=key, - checker=test_checker(), - location=location, - identity_client=OneIdentity(), - locator=locator) + version=bakery.LATEST_VERSION)) + return bakery.Bakery( + key=key, + checker=test_checker(), + location=location, + identity_client=OneIdentity(), + locator=locator, + ) diff --git a/macaroonbakery/tests/test_agent.py b/macaroonbakery/tests/test_agent.py index 49134f5..67f5b84 100644 --- a/macaroonbakery/tests/test_agent.py +++ b/macaroonbakery/tests/test_agent.py @@ -1,6 +1,7 @@ # Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. import base64 +from datetime import datetime, timedelta import json import os import tempfile @@ -9,7 +10,17 @@ from unittest import TestCase import nacl.encoding import requests.cookies import six +from six.moves.urllib.parse import parse_qs +from six.moves.http_cookies import SimpleCookie +from httmock import ( + HTTMock, + urlmatch, + response +) +import macaroonbakery as bakery +import macaroonbakery.httpbakery as httpbakery +import macaroonbakery.checkers as checkers import macaroonbakery.httpbakery.agent as agent @@ -62,13 +73,19 @@ class TestAgents(TestCase): def test_load_agents_into_cookies(self): cookies = requests.cookies.RequestsCookieJar() - c1, key = agent.load_agent_file(self.agent_filename, cookies=cookies) + c1, key = agent.load_agent_file( + self.agent_filename, + cookies=cookies, + ) self.assertEqual(c1, cookies) - self.assertEqual(key.encode(nacl.encoding.Base64Encoder), - b'CqoSgj06Zcgb4/S6RT4DpTjLAfKoznEY3JsShSjKJEU=') + self.assertEqual( + key.encode(nacl.encoding.Base64Encoder), + b'CqoSgj06Zcgb4/S6RT4DpTjLAfKoznEY3JsShSjKJEU=', + ) self.assertEqual( key.public_key.encode(nacl.encoding.Base64Encoder), - b'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') + b'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=', + ) value = cookies.get('agent-login', domain='1.example.com') jv = base64.b64decode(value) @@ -76,8 +93,7 @@ class TestAgents(TestCase): jv = jv.decode('utf-8') data = json.loads(jv) self.assertEqual(data['username'], 'user-1') - self.assertEqual(data['public_key'], - 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') + self.assertEqual(data['public_key'], 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') value = cookies.get('agent-login', domain='2.example.com', path='/discharger') @@ -86,8 +102,7 @@ class TestAgents(TestCase): jv = jv.decode('utf-8') data = json.loads(jv) self.assertEqual(data['username'], 'user-2') - self.assertEqual(data['public_key'], - 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') + self.assertEqual(data['public_key'], 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') def test_load_agents_with_bad_key(self): with self.assertRaises(agent.AgentFileFormatError): @@ -97,6 +112,295 @@ class TestAgents(TestCase): with self.assertRaises(agent.AgentFileFormatError): agent.load_agent_file(self.no_username_agent_filename) + def test_agent_login(self): + discharge_key = bakery.generate_key() + + class _DischargerLocator(bakery.ThirdPartyLocator): + def third_party_info(self, loc): + if loc == 'http://0.3.2.1': + return bakery.ThirdPartyInfo( + public_key=discharge_key.public_key, + version=bakery.LATEST_VERSION, + ) + d = _DischargerLocator() + server_key = bakery.generate_key() + server_bakery = bakery.Bakery(key=server_key, locator=d) + + @urlmatch(path='.*/here') + def server_get(url, request): + ctx = checkers.AuthContext() + test_ops = [bakery.Op(entity='test-op', action='read')] + auth_checker = server_bakery.checker.auth( + httpbakery.extract_macaroons(request.headers)) + try: + auth_checker.allow(ctx, test_ops) + resp = response(status_code=200, + content='done') + except bakery.PermissionDenied: + caveats = [ + checkers.Caveat(location='http://0.3.2.1', condition='is-ok') + ] + m = server_bakery.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=datetime.utcnow() + timedelta(days=1), + caveats=caveats, ops=test_ops) + content, headers = httpbakery.discharge_required_response( + m, '/', + 'test', + 'message') + resp = response(status_code=401, + content=content, + headers=headers) + return request.hooks['response'][0](resp) + + @urlmatch(path='.*/discharge') + def discharge(url, request): + qs = parse_qs(request.body) + if qs.get('token64') is None: + return response( + status_code=401, + content={ + 'Code': httpbakery.ERR_INTERACTION_REQUIRED, + 'Message': 'interaction required', + 'Info': { + 'InteractionMethods': { + 'agent': {'login-url': '/login'}, + }, + }, + }, + headers={'Content-Type': 'application/json'}) + else: + qs = parse_qs(request.body) + content = {q: qs[q][0] for q in qs} + m = httpbakery.discharge(checkers.AuthContext(), content, + discharge_key, None, alwaysOK3rd) + return { + 'status_code': 200, + 'content': { + 'Macaroon': m.serialize_json() + } + } + + key = bakery.generate_key() + + @urlmatch(path='.*/login') + def login(url, request): + b = bakery.Bakery(key=discharge_key) + m = b.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=datetime.utcnow() + timedelta(days=1), + caveats=[bakery.local_third_party_caveat( + key.public_key, + version=httpbakery.request_version(request.headers))], + ops=[bakery.Op(entity='agent', action='login')]) + return { + 'status_code': 200, + 'content': { + 'macaroon': m.to_dict() + } + } + + with HTTMock(server_get), \ + HTTMock(discharge), \ + HTTMock(login): + client = httpbakery.Client(interaction_methods=[ + agent.AgentInteractor( + agent.AuthInfo( + key=key, + agents=[ + agent.Agent( + username='test-user', + url=u'http://0.3.2.1' + ) + ], + ), + ), + ]) + resp = requests.get( + 'http://0.1.2.3/here', + cookies=client.cookies, + auth=client.auth()) + self.assertEquals(resp.content, b'done') + + def test_agent_legacy(self): + discharge_key = bakery.generate_key() + + class _DischargerLocator(bakery.ThirdPartyLocator): + def third_party_info(self, loc): + if loc == 'http://0.3.2.1': + return bakery.ThirdPartyInfo( + public_key=discharge_key.public_key, + version=bakery.LATEST_VERSION, + ) + d = _DischargerLocator() + server_key = bakery.generate_key() + server_bakery = bakery.Bakery(key=server_key, locator=d) + + @urlmatch(path='.*/here') + def server_get(url, request): + ctx = checkers.AuthContext() + test_ops = [bakery.Op(entity='test-op', action='read')] + auth_checker = server_bakery.checker.auth( + httpbakery.extract_macaroons(request.headers)) + try: + auth_checker.allow(ctx, test_ops) + resp = response(status_code=200, + content='done') + except bakery.PermissionDenied: + caveats = [ + checkers.Caveat(location='http://0.3.2.1', + condition='is-ok') + ] + m = server_bakery.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=datetime.utcnow() + timedelta(days=1), + caveats=caveats, ops=test_ops) + content, headers = httpbakery.discharge_required_response( + m, '/', + 'test', + 'message') + resp = response( + status_code=401, + content=content, + headers=headers, + ) + return request.hooks['response'][0](resp) + + class InfoStorage: + info = None + + @urlmatch(path='.*/discharge') + def discharge(url, request): + qs = parse_qs(request.body) + if qs.get('caveat64') is not None: + content = {q: qs[q][0] for q in qs} + + class InteractionRequiredError(Exception): + def __init__(self, error): + self.error = error + + class CheckerInError(bakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(self, ctx, info): + InfoStorage.info = info + raise InteractionRequiredError( + httpbakery.Error( + code=httpbakery.ERR_INTERACTION_REQUIRED, + version=httpbakery.request_version( + request.headers), + message='interaction required', + info=httpbakery.ErrorInfo( + wait_url='http://0.3.2.1/wait?' + 'dischargeid=1', + visit_url='http://0.3.2.1/visit?' + 'dischargeid=1' + ), + ), + ) + try: + httpbakery.discharge( + checkers.AuthContext(), content, + discharge_key, None, CheckerInError()) + except InteractionRequiredError as exc: + return response( + status_code=401, + content={ + 'Code': exc.error.code, + 'Message': exc.error.message, + 'Info': { + 'WaitURL': exc.error.info.wait_url, + 'VisitURL': exc.error.info.visit_url, + }, + }, + headers={'Content-Type': 'application/json'}) + + key = bakery.generate_key() + + @urlmatch(path='.*/visit?$') + def visit(url, request): + if request.headers.get('Accept') == 'application/json': + return { + 'status_code': 200, + 'content': { + 'agent': request.url + } + } + cs = SimpleCookie() + cookies = request.headers.get('Cookie') + if cookies is not None: + cs.load(str(cookies)) + public_key = None + for c in cs: + if c == 'agent-login': + json_cookie = json.loads( + base64.b64decode(cs[c].value).decode('utf-8')) + public_key = bakery.PublicKey.deserialize(json_cookie.get('public_key')) + ms = httpbakery.extract_macaroons(request.headers) + if len(ms) == 0: + b = bakery.Bakery(key=discharge_key) + m = b.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=datetime.utcnow() + timedelta(days=1), + caveats=[bakery.local_third_party_caveat( + public_key, + version=httpbakery.request_version(request.headers))], + ops=[bakery.Op(entity='agent', action='login')]) + content, headers = httpbakery.discharge_required_response( + m, '/', + 'test', + 'message') + resp = response(status_code=401, + content=content, + headers=headers) + return request.hooks['response'][0](resp) + + return { + 'status_code': 200, + 'content': { + 'agent-login': True + } + } + + @urlmatch(path='.*/wait?$') + def wait(url, request): + class EmptyChecker(bakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(self, ctx, info): + return [] + if InfoStorage.info is None: + self.fail('visit url has not been visited') + m = bakery.discharge( + checkers.AuthContext(), + InfoStorage.info.id, + InfoStorage.info.caveat, + discharge_key, + EmptyChecker(), + _DischargerLocator(), + ) + return { + 'status_code': 200, + 'content': { + 'Macaroon': m.to_dict() + } + } + + with HTTMock(server_get), \ + HTTMock(discharge), \ + HTTMock(visit), \ + HTTMock(wait): + client = httpbakery.Client(interaction_methods=[ + agent.AgentInteractor( + agent.AuthInfo( + key=key, + agents=[agent.Agent(username='test-user', url=u'http://0.3.2.1')], + ), + ), + ]) + resp = requests.get( + 'http://0.1.2.3/here', + cookies=client.cookies, + auth=client.auth(), + ) + self.assertEquals(resp.content, b'done') + agent_file = ''' { @@ -146,3 +450,14 @@ no_username_agent_file = ''' }] } ''' + + +class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker): + def __init__(self, check): + self._check = check + + def check_third_party_caveat(self, ctx, info): + cond, arg = checkers.parse_caveat(info.condition) + return self._check(cond, arg) + +alwaysOK3rd = ThirdPartyCaveatCheckerF(lambda cond, arg: []) diff --git a/macaroonbakery/tests/test_authorizer.py b/macaroonbakery/tests/test_authorizer.py index da01974..f90d2b5 100644 --- a/macaroonbakery/tests/test_authorizer.py +++ b/macaroonbakery/tests/test_authorizer.py @@ -2,7 +2,7 @@ # Licensed under the LGPLv3, see LICENCE file for details. from unittest import TestCase -import macaroonbakery +import macaroonbakery as bakery import macaroonbakery.checkers as checkers @@ -23,11 +23,11 @@ class TestAuthorizer(TestCase): else: self.fail('unexpected entity: ' + op.Entity) - ops = [macaroonbakery.Op('a', 'x'), macaroonbakery.Op('b', 'x'), - macaroonbakery.Op('c', 'x'), macaroonbakery.Op('d', 'x')] - allowed, caveats = macaroonbakery.AuthorizerFunc(f).authorize( + ops = [bakery.Op('a', 'x'), bakery.Op('b', 'x'), + bakery.Op('c', 'x'), bakery.Op('d', 'x')] + allowed, caveats = bakery.AuthorizerFunc(f).authorize( checkers.AuthContext(), - macaroonbakery.SimpleIdentity('bob'), + bakery.SimpleIdentity('bob'), ops ) self.assertEqual(allowed, [False, True, True, True]) @@ -40,42 +40,45 @@ class TestAuthorizer(TestCase): ctx = checkers.AuthContext() tests = [ ('no ops, no problem', - macaroonbakery.ACLAuthorizer(allow_public=True, - get_acl=lambda x, y: []), None, [], + bakery.ACLAuthorizer(allow_public=True, get_acl=lambda x, y: []), + None, + [], []), ('identity that does not implement ACLIdentity; ' 'user should be denied except for everyone group', - macaroonbakery.ACLAuthorizer(allow_public=True, - get_acl=lambda ctx, op: [ - macaroonbakery.EVERYONE] - if op.entity == 'a' else ['alice']), + bakery.ACLAuthorizer( + allow_public=True, + get_acl=lambda ctx, op: [bakery.EVERYONE] if op.entity == 'a' else ['alice'], + ), SimplestIdentity('bob'), - [macaroonbakery.Op(entity='a', action='a'), - macaroonbakery.Op(entity='b', action='b')], + [bakery.Op(entity='a', action='a'), + bakery.Op(entity='b', action='b')], [True, False]), ('identity that does not implement ACLIdentity with user == Id; ' 'user should be denied except for everyone group', - macaroonbakery.ACLAuthorizer(allow_public=True, - get_acl=lambda ctx, op: [ - macaroonbakery.EVERYONE] if - op.entity == 'a' else ['bob']), + bakery.ACLAuthorizer( + allow_public=True, + get_acl=lambda ctx, op: [bakery.EVERYONE] if op.entity == 'a' else ['bob'], + ), SimplestIdentity('bob'), - [macaroonbakery.Op(entity='a', action='a'), - macaroonbakery.Op(entity='b', action='b')], + [bakery.Op(entity='a', action='a'), + bakery.Op(entity='b', action='b')], [True, False]), ('permission denied for everyone without AllowPublic', - macaroonbakery.ACLAuthorizer(allow_public=False, - get_acl=lambda x, y: [ - macaroonbakery.EVERYONE]), + bakery.ACLAuthorizer( + allow_public=False, + get_acl=lambda x, y: [bakery.EVERYONE], + ), SimplestIdentity('bob'), - [macaroonbakery.Op(entity='a', action='a')], + [bakery.Op(entity='a', action='a')], [False]), ('permission granted to anyone with no identity with AllowPublic', - macaroonbakery.ACLAuthorizer(allow_public=True, - get_acl=lambda x, y: [ - macaroonbakery.EVERYONE]), + bakery.ACLAuthorizer( + allow_public=True, + get_acl=lambda x, y: [bakery.EVERYONE], + ), None, - [macaroonbakery.Op(entity='a', action='a')], + [bakery.Op(entity='a', action='a')], [True]) ] for test in tests: @@ -96,12 +99,12 @@ class TestAuthorizer(TestCase): Visited.in_f = True return False, None - macaroonbakery.AuthorizerFunc(f).authorize( - ctx, macaroonbakery.SimpleIdentity('bob'), ['op1'] + bakery.AuthorizerFunc(f).authorize( + ctx, bakery.SimpleIdentity('bob'), ['op1'] ) self.assertTrue(Visited.in_f) - class TestIdentity(SimplestIdentity, macaroonbakery.ACLIdentity): + class TestIdentity(SimplestIdentity, bakery.ACLIdentity): def allow(other, ctx, acls): self.assertEqual(ctx.get('a'), 'aval') Visited.in_allow = True @@ -112,14 +115,15 @@ class TestAuthorizer(TestCase): Visited.in_get_acl = True return [] - macaroonbakery.ACLAuthorizer(allow_public=False, - get_acl=get_acl).authorize( - ctx, TestIdentity('bob'), ['op1']) + bakery.ACLAuthorizer( + allow_public=False, + get_acl=get_acl, + ).authorize(ctx, TestIdentity('bob'), ['op1']) self.assertTrue(Visited.in_get_acl) self.assertTrue(Visited.in_allow) -class SimplestIdentity(macaroonbakery.Identity): +class SimplestIdentity(bakery.Identity): # SimplestIdentity implements Identity for a string. Unlike # SimpleIdentity, it does not implement ACLIdentity. def __init__(self, user): diff --git a/macaroonbakery/tests/test_bakery.py b/macaroonbakery/tests/test_bakery.py index 724b264..5a13cff 100644 --- a/macaroonbakery/tests/test_bakery.py +++ b/macaroonbakery/tests/test_bakery.py @@ -14,7 +14,7 @@ from httmock import ( response ) -from macaroonbakery import httpbakery +import macaroonbakery.httpbakery as httpbakery ID_PATH = 'http://example.com/someprotecteurl' @@ -90,6 +90,32 @@ def first_407_then_200(url, request): return request.hooks['response'][0](resp) +@urlmatch(netloc='example.com:8000', path='.*/someprotecteurl') +def first_407_then_200_with_port(url, request): + if request.headers.get('cookie', '').startswith('macaroon-'): + return { + 'status_code': 200, + 'content': { + 'Value': 'some value' + } + } + else: + resp = response(status_code=407, + content={ + 'Info': { + 'Macaroon': json_macaroon, + 'MacaroonPath': '/', + 'CookieNameSuffix': 'test' + }, + 'Message': 'verification failed: no macaroon ' + 'cookies in request', + 'Code': 'macaroon discharge required' + }, + headers={'Content-Type': 'application/json'}, + request=request) + return request.hooks['response'][0](resp) + + @urlmatch(path='.*/someprotecteurl') def valid_200(url, request): return { @@ -142,25 +168,55 @@ def wait_after_401(url, request): class TestBakery(TestCase): + + def assert_cookie_security(self, cookies, name, secure): + for cookie in cookies: + if cookie.name == name: + assert cookie.secure == secure + break + else: + assert False, 'no cookie named {} found in jar'.format(name) + def test_discharge(self): - jar = requests.cookies.RequestsCookieJar() - with HTTMock(first_407_then_200): - with HTTMock(discharge_200): + client = httpbakery.Client() + with HTTMock(first_407_then_200), HTTMock(discharge_200): resp = requests.get(ID_PATH, - cookies=jar, - auth=httpbakery.BakeryAuth(cookies=jar)) + cookies=client.cookies, + auth=client.auth()) resp.raise_for_status() - assert 'macaroon-test' in jar.keys() + assert 'macaroon-test' in client.cookies.keys() + self.assert_cookie_security(client.cookies, 'macaroon-test', secure=False) @patch('webbrowser.open') def test_407_then_401_on_discharge(self, mock_open): - jar = requests.cookies.RequestsCookieJar() - with HTTMock(first_407_then_200): - with HTTMock(discharge_401): - with HTTMock(wait_after_401): - resp = requests.get(ID_PATH, - auth=httpbakery.BakeryAuth( - cookies=jar)) - resp.raise_for_status() + client = httpbakery.Client() + with HTTMock(first_407_then_200), HTTMock(discharge_401), HTTMock(wait_after_401): + resp = requests.get( + ID_PATH, + cookies=client.cookies, + auth=client.auth(), + ) + resp.raise_for_status() mock_open.assert_called_once_with(u'http://example.com/visit', new=1) - assert 'macaroon-test' in jar.keys() + assert 'macaroon-test' in client.cookies.keys() + + def test_cookie_with_port(self): + client = httpbakery.Client() + with HTTMock(first_407_then_200_with_port): + with HTTMock(discharge_200): + resp = requests.get('http://example.com:8000/someprotecteurl', + cookies=client.cookies, + auth=client.auth()) + resp.raise_for_status() + assert 'macaroon-test' in client.cookies.keys() + + def test_secure_cookie_for_https(self): + client = httpbakery.Client() + with HTTMock(first_407_then_200_with_port), HTTMock(discharge_200): + resp = requests.get( + 'https://example.com:8000/someprotecteurl', + cookies=client.cookies, + auth=client.auth()) + resp.raise_for_status() + assert 'macaroon-test' in client.cookies.keys() + self.assert_cookie_security(client.cookies, 'macaroon-test', secure=True) diff --git a/macaroonbakery/tests/test_checker.py b/macaroonbakery/tests/test_checker.py index 06bf008..643c756 100644 --- a/macaroonbakery/tests/test_checker.py +++ b/macaroonbakery/tests/test_checker.py @@ -9,7 +9,7 @@ from datetime import timedelta from pymacaroons.verifier import Verifier, FirstPartyCaveatVerifierDelegate import pymacaroons -import macaroonbakery +import macaroonbakery as bakery import macaroonbakery.checkers as checkers from macaroonbakery.tests.common import test_context, epoch, test_checker @@ -22,13 +22,13 @@ class TestChecker(TestCase): locator = _DischargerLocator() ids = _IdService('ids', locator, self) auth = _OpAuthorizer( - {macaroonbakery.Op(entity='something', action='read'): - {macaroonbakery.EVERYONE}}) + {bakery.Op(entity='something', action='read'): + {bakery.EVERYONE}}) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) - auth_info = client.do(test_context, ts, - [macaroonbakery.Op(entity='something', - action='read')]) + auth_info = client.do(test_context, ts, [ + bakery.Op(entity='something', action='read'), + ]) self.assertEqual(len(self._discharges), 0) self.assertIsNotNone(auth_info) self.assertIsNone(auth_info.identity) @@ -37,25 +37,23 @@ class TestChecker(TestCase): def test_authorization_denied(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = macaroonbakery.ClosedAuthorizer() + auth = bakery.ClosedAuthorizer() ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - with self.assertRaises(macaroonbakery.PermissionDenied): - client.do(ctx, ts, [macaroonbakery.Op(entity='something', - action='read')]) + with self.assertRaises(bakery.PermissionDenied): + client.do(ctx, ts, [bakery.Op(entity='something', action='read')]) def test_authorize_with_authentication_required(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) auth = _OpAuthorizer( - {macaroonbakery.Op(entity='something', action='read'): {'bob'}}) + {bakery.Op(entity='something', action='read'): {'bob'}}) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - auth_info = client.do(ctx, ts, [macaroonbakery.Op(entity='something', - action='read')]) + auth_info = client.do(ctx, ts, [bakery.Op(entity='something', action='read')]) self.assertEqual(self._discharges, [_DischargeRecord(location='ids', user='bob')]) self.assertIsNotNone(auth_info) @@ -67,16 +65,16 @@ class TestChecker(TestCase): ids = _IdService('ids', locator, self) auth = _OpAuthorizer( { - macaroonbakery.Op(entity='something', action='read'): {'bob'}, - macaroonbakery.Op(entity='otherthing', action='read'): {'bob'} + bakery.Op(entity='something', action='read'): {'bob'}, + bakery.Op(entity='otherthing', action='read'): {'bob'} } ) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') client.do(ctx, ts, [ - macaroonbakery.Op(entity='something', action='read'), - macaroonbakery.Op(entity='otherthing', action='read') + bakery.Op(entity='something', action='read'), + bakery.Op(entity='otherthing', action='read') ]) self.assertEqual(self._discharges, [_DischargeRecord(location='ids', user='bob')]) @@ -85,159 +83,150 @@ class TestChecker(TestCase): locator = _DischargerLocator() ids = _IdService('ids', locator, self) auth = _OpAuthorizer( - {macaroonbakery.Op(entity='something', action='read'): {'bob'}}) + {bakery.Op(entity='something', action='read'): {'bob'}}) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m = client.discharged_capability( - ctx, ts, [macaroonbakery.Op(entity='something', action='read')]) + ctx, ts, [bakery.Op(entity='something', action='read')]) # Check that we can exercise the capability directly on the service # with no discharging required. - auth_info = ts.do(test_context, [m], - [macaroonbakery.Op(entity='something', - action='read')]) + auth_info = ts.do(test_context, [m], [ + bakery.Op(entity='something', action='read'), + ]) self.assertIsNotNone(auth_info) self.assertIsNone(auth_info.identity) self.assertEqual(len(auth_info.macaroons), 1) - self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, - m[0].identifier_bytes) + self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, m[0].identifier_bytes) def test_capability_multiple_entities(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'bob'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - macaroonbakery.Op(entity='e3', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'bob'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + bakery.Op(entity='e3', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m = client.discharged_capability(ctx, ts, [ - macaroonbakery.Op(entity='e1', - action='read'), - macaroonbakery.Op(entity='e2', - action='read'), - macaroonbakery.Op(entity='e3', - action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) self.assertEqual(self._discharges, [_DischargeRecord(location='ids', user='bob')]) # Check that we can exercise the capability directly on the service # with no discharging required. ts.do(test_context, [m], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read'), - macaroonbakery.Op(entity='e3', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) # Check that we can exercise the capability to act on a subset of # the operations. ts.do(test_context, [m], [ - macaroonbakery.Op(entity='e2', action='read'), - macaroonbakery.Op(entity='e3', action='read')] - ) + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) ts.do(test_context, [m], - [macaroonbakery.Op(entity='e3', action='read')]) + [bakery.Op(entity='e3', action='read')]) def test_multiple_capabilities(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) # Acquire two capabilities as different users and check # that we can combine them together to do both operations # at once. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - m1 = _Client(locator).discharged_capability(ctx, ts, - [macaroonbakery.Op( - entity='e1', - action='read')]) + m1 = _Client(locator).discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + ]) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m2 = _Client(locator).discharged_capability(ctx, ts, - [macaroonbakery.Op( + [bakery.Op( entity='e2', action='read')]) - self.assertEqual(self._discharges, - [ - _DischargeRecord(location='ids', user='alice'), - _DischargeRecord(location='ids', user='bob'), - ]) + self.assertEqual(self._discharges, [ + _DischargeRecord(location='ids', user='alice'), + _DischargeRecord(location='ids', user='bob'), + ]) auth_info = ts.do(test_context, [m1, m2], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) self.assertIsNotNone(auth_info) self.assertIsNone(auth_info.identity) self.assertEqual(len(auth_info.macaroons), 2) - self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, - m1[0].identifier_bytes) - self.assertEqual(auth_info.macaroons[1][0].identifier_bytes, - m2[0].identifier_bytes) + self.assertEqual(auth_info.macaroons[0][0].identifier_bytes, m1[0].identifier_bytes) + self.assertEqual(auth_info.macaroons[1][0].identifier_bytes, m2[0].identifier_bytes) def test_combine_capabilities(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - macaroonbakery.Op(entity='e3', action='read'): {'bob', - 'alice'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + bakery.Op(entity='e3', action='read'): {'bob', 'alice'}, + }) ts = _Service('myservice', auth, ids, locator) # Acquire two capabilities as different users and check # that we can combine them together into a single capability # capable of both operations. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - m1 = _Client(locator).discharged_capability( - ctx, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e3', action='read')]) + m1 = _Client(locator).discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e3', action='read'), + ]) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m2 = _Client(locator).discharged_capability( - ctx, ts, [macaroonbakery.Op(entity='e2', action='read')]) + ctx, ts, [bakery.Op(entity='e2', action='read')]) m = ts.capability(test_context, [m1, m2], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read'), - macaroonbakery.Op(entity='e3', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read'), - macaroonbakery.Op(entity='e3', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + bakery.Op(entity='e3', action='read'), + ]) def test_partially_authorized_request(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) # Acquire a capability for e1 but rely on authentication to # authorize e2. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - m = _Client(locator).discharged_capability(ctx, ts, - [macaroonbakery.Op( - entity='e1', - action='read')]) + m = _Client(locator).discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + ]) client = _Client(locator) client.add_macaroon(ts, 'authz', m) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - client.discharged_capability( - ctx, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + client.discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) def test_auth_with_third_party_caveats(self): locator = _DischargerLocator() @@ -247,16 +236,15 @@ class TestChecker(TestCase): # when authorizing. def authorize_with_tp_discharge(ctx, id, op): if (id is not None and id.id() == 'bob' and - op == macaroonbakery.Op(entity='something', - action='read')): + op == bakery.Op(entity='something', action='read')): return True, [checkers.Caveat(condition='question', location='other third party')] return False, None - auth = macaroonbakery.AuthorizerFunc(authorize_with_tp_discharge) + auth = bakery.AuthorizerFunc(authorize_with_tp_discharge) ts = _Service('myservice', auth, ids, locator) - class _LocalDischargeChecker(macaroonbakery.ThirdPartyCaveatChecker): + class _LocalDischargeChecker(bakery.ThirdPartyCaveatChecker): def check_third_party_caveat(_, ctx, info): if info.condition != 'question': raise ValueError('third party condition not recognized') @@ -267,29 +255,25 @@ class TestChecker(TestCase): return [] locator['other third party'] = _Discharger( - key=macaroonbakery.generate_key(), + key=bakery.generate_key(), checker=_LocalDischargeChecker(), locator=locator, ) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - client.do(ctx, ts, [macaroonbakery.Op(entity='something', - action='read')]) + client.do(ctx, ts, [bakery.Op(entity='something', action='read')]) self.assertEqual(self._discharges, [ _DischargeRecord(location='ids', user='bob'), - _DischargeRecord(location='other third party', - user='bob') + _DischargeRecord(location='other third party', user='bob') ]) def test_capability_combines_first_party_caveats(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'} - } - ) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) # Acquire two capabilities as different users, add some first party @@ -297,12 +281,12 @@ class TestChecker(TestCase): # capable of both operations. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') m1 = _Client(locator).capability( - ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + ctx, ts, [bakery.Op(entity='e1', action='read')]) m1.macaroon.add_first_party_caveat('true 1') m1.macaroon.add_first_party_caveat('true 2') ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') m2 = _Client(locator).capability( - ctx, ts, [macaroonbakery.Op(entity='e2', action='read')]) + ctx, ts, [bakery.Op(entity='e2', action='read')]) m2.macaroon.add_first_party_caveat('true 3') m2.macaroon.add_first_party_caveat('true 4') @@ -311,8 +295,9 @@ class TestChecker(TestCase): client.add_macaroon(ts, 'authz2', [m2.macaroon]) m = client.capability(test_context, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) self.assertEqual(_macaroon_conditions(m.macaroon.caveats, False), [ 'true 1', 'true 2', @@ -323,11 +308,10 @@ class TestChecker(TestCase): def test_first_party_caveat_squashing(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'alice'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'alice'}, + }) ts = _Service('myservice', auth, ids, locator) tests = [ ('duplicates removed', [ @@ -366,13 +350,13 @@ class TestChecker(TestCase): # Make a first macaroon with all the required first party caveats. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') m1 = _Client(locator).capability( - ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + ctx, ts, [bakery.Op(entity='e1', action='read')]) m1.add_caveats(test[1], None, None) # Make a second macaroon that's not used to check that it's # caveats are not added. m2 = _Client(locator).capability( - ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + ctx, ts, [bakery.Op(entity='e1', action='read')]) m2.add_caveat(checkers.Caveat( condition='true notused', namespace='testns'), None, None) client = _Client(locator) @@ -380,8 +364,7 @@ class TestChecker(TestCase): client.add_macaroon(ts, 'authz2', [m2.macaroon]) m3 = client.capability( - test_context, ts, [macaroonbakery.Op(entity='e1', - action='read')]) + test_context, ts, [bakery.Op(entity='e1', action='read')]) self.assertEqual( _macaroon_conditions(m3.macaroon.caveats, False), _resolve_caveats(m3.namespace, test[2])) @@ -389,11 +372,11 @@ class TestChecker(TestCase): def test_login_only(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = macaroonbakery.ClosedAuthorizer() + auth = bakery.ClosedAuthorizer() ts = _Service('myservice', auth, ids, locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP]) + auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP]) self.assertIsNotNone(auth_info) self.assertEqual(auth_info.identity.id(), 'bob') @@ -402,18 +385,17 @@ class TestChecker(TestCase): ids = _IdService('ids', locator, self) auth = _OpAuthorizer( { - macaroonbakery.Op(entity='e1', action='read'): {'alice'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, + bakery.Op(entity='e1', action='read'): {'alice'}, + bakery.Op(entity='e2', action='read'): {'bob'}, }) ts = _Service('myservice', auth, ids, locator) # Acquire a capability for e1 but rely on authentication to # authorize e2. ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - m = _Client(locator).discharged_capability(ctx, ts, - [macaroonbakery.Op( - entity='e1', - action='read')]) + m = _Client(locator).discharged_capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + ]) client = _Client(locator) client.add_macaroon(ts, 'authz', m) @@ -423,24 +405,22 @@ class TestChecker(TestCase): with self.assertRaises(_DischargeRequiredError): client.do_any( ctx, ts, [ - macaroonbakery.LOGIN_OP, - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e1', action='read') + bakery.LOGIN_OP, + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e1', action='read') ] ) self.assertEqual(len(self._discharges), 0) # Log in as bob. - _, err = client.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + _, err = client.do(ctx, ts, [bakery.LOGIN_OP]) # All the previous actions should now be allowed. - auth_info, allowed = client.do_any( - ctx, ts, [ - macaroonbakery.LOGIN_OP, - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e1', action='read') - ] - ) + auth_info, allowed = client.do_any(ctx, ts, [ + bakery.LOGIN_OP, + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e1', action='read'), + ]) self.assertEqual(auth_info.identity.id(), 'bob') self.assertEqual(len(auth_info.macaroons), 2) self.assertEqual(allowed, [True, True, True]) @@ -448,123 +428,121 @@ class TestChecker(TestCase): def test_auth_with_identity_from_context(self): locator = _DischargerLocator() ids = _BasicAuthIdService() - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'sherlock'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'sherlock'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) # Check that we can perform the ops with basic auth in the # context. ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes') auth_info = _Client(locator).do( - ctx, ts, [macaroonbakery.Op(entity='e1', action='read')]) + ctx, ts, [bakery.Op(entity='e1', action='read')]) self.assertEqual(auth_info.identity.id(), 'sherlock') self.assertEqual(len(auth_info.macaroons), 0) def test_auth_login_op_with_identity_from_context(self): locator = _DischargerLocator() ids = _BasicAuthIdService() - ts = _Service('myservice', macaroonbakery.ClosedAuthorizer(), - ids, locator) + ts = _Service('myservice', bakery.ClosedAuthorizer(), ids, locator) # Check that we can use LoginOp # when auth isn't granted through macaroons. ctx = _context_with_basic_auth(test_context, 'sherlock', 'holmes') - auth_info = _Client(locator).do(ctx, ts, [macaroonbakery.LOGIN_OP]) + auth_info = _Client(locator).do(ctx, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'sherlock') self.assertEqual(len(auth_info.macaroons), 0) def test_operation_allow_caveat(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'bob'}, - macaroonbakery.Op(entity='e1', action='write'): {'bob'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'bob'}, + bakery.Op(entity='e1', action='write'): {'bob'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - m = client.capability( - ctx, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e1', action='write'), - macaroonbakery.Op(entity='e2', action='read')]) + m = client.capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e1', action='write'), + bakery.Op(entity='e2', action='read'), + ]) # Sanity check that we can do a write. ts.do(test_context, [[m.macaroon]], - [macaroonbakery.Op(entity='e1', action='write')]) + [bakery.Op(entity='e1', action='write')]) m.add_caveat(checkers.allow_caveat(['read']), None, None) # A read operation should work. ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) # A write operation should fail # even though the original macaroon allowed it. with self.assertRaises(_DischargeRequiredError): ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='write')]) + bakery.Op(entity='e1', action='write'), + ]) def test_operation_deny_caveat(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = _OpAuthorizer( - { - macaroonbakery.Op(entity='e1', action='read'): {'bob'}, - macaroonbakery.Op(entity='e1', action='write'): {'bob'}, - macaroonbakery.Op(entity='e2', action='read'): {'bob'}, - }) + auth = _OpAuthorizer({ + bakery.Op(entity='e1', action='read'): {'bob'}, + bakery.Op(entity='e1', action='write'): {'bob'}, + bakery.Op(entity='e2', action='read'): {'bob'}, + }) ts = _Service('myservice', auth, ids, locator) client = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - m = client.capability( - ctx, ts, [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e1', action='write'), - macaroonbakery.Op(entity='e2', action='read')]) + m = client.capability(ctx, ts, [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e1', action='write'), + bakery.Op(entity='e2', action='read'), + ]) # Sanity check that we can do a write. ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='write')]) + bakery.Op(entity='e1', action='write')]) m.add_caveat(checkers.deny_caveat(['write']), None, None) # A read operation should work. - ts.do( - test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='read'), - macaroonbakery.Op(entity='e2', action='read')]) + ts.do(test_context, [[m.macaroon]], [ + bakery.Op(entity='e1', action='read'), + bakery.Op(entity='e2', action='read'), + ]) # A write operation should fail # even though the original macaroon allowed it. with self.assertRaises(_DischargeRequiredError): ts.do(test_context, [[m.macaroon]], [ - macaroonbakery.Op(entity='e1', action='write')]) + bakery.Op(entity='e1', action='write')]) def test_duplicate_login_macaroons(self): locator = _DischargerLocator() ids = _IdService('ids', locator, self) - auth = macaroonbakery.ClosedAuthorizer() + auth = bakery.ClosedAuthorizer() ts = _Service('myservice', auth, ids, locator) # Acquire a login macaroon for bob. client1 = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'bob') - auth_info = client1.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + auth_info = client1.do(ctx, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'bob') # Acquire a login macaroon for alice. client2 = _Client(locator) ctx = test_context.with_value(_DISCHARGE_USER_KEY, 'alice') - auth_info = client2.do(ctx, ts, [macaroonbakery.LOGIN_OP]) + auth_info = client2.do(ctx, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'alice') # Combine the two login macaroons into one client. @@ -576,32 +554,30 @@ class TestChecker(TestCase): # We should authenticate as bob (because macaroons are presented # ordered by "cookie" name) - auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP]) + auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'bob') self.assertEqual(len(auth_info.macaroons), 1) # Try them the other way around and we should authenticate as alice. client3 = _Client(locator) - client3.add_macaroon(ts, '1.alice', - client2._macaroons[ts.name()]['authn']) - client3.add_macaroon(ts, '2.bob', - client1._macaroons[ts.name()]['authn']) + client3.add_macaroon(ts, '1.alice', client2._macaroons[ts.name()]['authn']) + client3.add_macaroon(ts, '2.bob', client1._macaroons[ts.name()]['authn']) - auth_info = client3.do(test_context, ts, [macaroonbakery.LOGIN_OP]) + auth_info = client3.do(test_context, ts, [bakery.LOGIN_OP]) self.assertEqual(auth_info.identity.id(), 'alice') self.assertEqual(len(auth_info.macaroons), 1) def test_macaroon_ops_fatal_error(self): # When we get a non-VerificationError error from the # opstore, we don't do any more verification. - checker = macaroonbakery.Checker( + checker = bakery.Checker( macaroon_opstore=_MacaroonStoreWithError()) m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V2) - with self.assertRaises(ValueError): - checker.auth([m]).allow(test_context, [macaroonbakery.LOGIN_OP]) + with self.assertRaises(bakery.AuthInitError): + checker.auth([m]).allow(test_context, [bakery.LOGIN_OP]) -class _DischargerLocator(object): +class _DischargerLocator(bakery.ThirdPartyLocator): def __init__(self, dischargers=None): if dischargers is None: dischargers = {} @@ -611,9 +587,9 @@ class _DischargerLocator(object): d = self._dischargers.get(loc) if d is None: return None - return macaroonbakery.ThirdPartyInfo( + return bakery.ThirdPartyInfo( public_key=d._key.public_key, - version=macaroonbakery.LATEST_BAKERY_VERSION, + version=bakery.LATEST_VERSION, ) def __setitem__(self, key, item): @@ -626,25 +602,23 @@ class _DischargerLocator(object): return self._dischargers.get(key) -class _IdService(macaroonbakery.IdentityClient, - macaroonbakery.ThirdPartyCaveatChecker): +class _IdService(bakery.IdentityClient, + bakery.ThirdPartyCaveatChecker): def __init__(self, location, locator, test_class): self._location = location self._test = test_class - key = macaroonbakery.generate_key() + key = bakery.generate_key() self._discharger = _Discharger(key=key, checker=self, locator=locator) locator[location] = self._discharger def check_third_party_caveat(self, ctx, info): if info.condition != 'is-authenticated-user': - raise macaroonbakery.CaveatNotRecognizedError( - 'third party condition not ' - 'recognized') + raise bakery.CaveatNotRecognizedError( + 'third party condition not recognized') username = ctx.get(_DISCHARGE_USER_KEY, '') if username == '': - return macaroonbakery.ThirdPartyCaveatCheckFailed( - 'no current user') + raise bakery.ThirdPartyCaveatCheckFailed('no current user') self._test._discharges.append( _DischargeRecord(location=self._location, user=username)) return [checkers.declared_caveat('username', username)] @@ -656,8 +630,8 @@ class _IdService(macaroonbakery.IdentityClient, def declared_identity(self, ctx, declared): user = declared.get('username') if user is None: - raise macaroonbakery.IdentityError('no username declared') - return macaroonbakery.SimpleIdentity(user) + raise bakery.IdentityError('no username declared') + return bakery.SimpleIdentity(user) _DISCHARGE_USER_KEY = checkers.ContextKey('user-key') @@ -676,13 +650,17 @@ class _Discharger(object): self._checker = checker def discharge(self, ctx, cav, payload): - return macaroonbakery.discharge(ctx, key=self._key, id=cav.caveat_id, - caveat=payload, - checker=self._checker, - locator=self._locator) + return bakery.discharge( + ctx, + key=self._key, + id=cav.caveat_id, + caveat=payload, + checker=self._checker, + locator=self._locator, + ) -class _OpAuthorizer(macaroonbakery.Authorizer): +class _OpAuthorizer(bakery.Authorizer): '''Implements bakery.Authorizer by looking the operation up in the given map. If the username is in the associated list or the list contains "everyone", authorization is granted. @@ -694,7 +672,7 @@ class _OpAuthorizer(macaroonbakery.Authorizer): self._auth = auth def authorize(self, ctx, id, ops): - return macaroonbakery.ACLAuthorizer( + return bakery.ACLAuthorizer( allow_public=True, get_acl=lambda ctx, op: self._auth.get(op, [])).authorize( ctx, id, ops) @@ -705,7 +683,7 @@ class _MacaroonStore(object): ''' def __init__(self, key, locator): - self._root_key_store = macaroonbakery.MemoryKeyStore() + self._root_key_store = bakery.MemoryKeyStore() self._key = key self._locator = locator @@ -713,9 +691,9 @@ class _MacaroonStore(object): root_key, id = self._root_key_store.root_key() m_id = {'id': base64.urlsafe_b64encode(id).decode('utf-8'), 'ops': ops} data = json.dumps(m_id) - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key=root_key, id=data, location='', - version=macaroonbakery.LATEST_BAKERY_VERSION, + version=bakery.LATEST_VERSION, namespace=namespace) m.add_caveats(caveats, self._key, self._locator) return m @@ -739,7 +717,7 @@ class _MacaroonStore(object): ok = v.verify(macaroon=ms[0], key=root_key, discharge_macaroons=ms[1:]) if not ok: - raise macaroonbakery.VerificationError('invalid signature') + raise bakery.VerificationError('invalid signature') conditions = [] for m in ms: cavs = m.first_party_caveats() @@ -747,7 +725,7 @@ class _MacaroonStore(object): conditions.append(cav.caveat_id_bytes.decode('utf-8')) ops = [] for op in m_id['ops']: - ops.append(macaroonbakery.Op(entity=op[0], action=op[1])) + ops.append(bakery.Op(entity=op[0], action=op[1])) return ops, conditions @@ -761,8 +739,8 @@ class _Service(object): def __init__(self, name, auth, idm, locator): self._name = name - self._store = _MacaroonStore(macaroonbakery.generate_key(), locator) - self._checker = macaroonbakery.Checker( + self._store = _MacaroonStore(bakery.generate_key(), locator) + self._checker = bakery.Checker( checker=test_checker(), authorizer=auth, identity_client=idm, @@ -774,7 +752,7 @@ class _Service(object): def do(self, ctx, ms, ops): try: authInfo = self._checker.auth(ms).allow(ctx, ops) - except macaroonbakery.DischargeRequiredError as exc: + except bakery.DischargeRequiredError as exc: self._discharge_required_error(exc) return authInfo @@ -784,13 +762,13 @@ class _Service(object): try: authInfo, allowed = self._checker.auth(ms).allow_any(ctx, ops) return authInfo, allowed - except macaroonbakery.DischargeRequiredError as exc: + except bakery.DischargeRequiredError as exc: self._discharge_required_error(exc) def capability(self, ctx, ms, ops): try: conds = self._checker.auth(ms).allow_capability(ctx, ops) - except macaroonbakery.DischargeRequiredError as exc: + except bakery.DischargeRequiredError as exc: self._discharge_required_error(exc) m = self._store.new_macaroon(None, self._checker.namespace(), ops) @@ -802,7 +780,7 @@ class _Service(object): m = self._store.new_macaroon(err.cavs(), self._checker.namespace(), err.ops()) name = 'authz' - if len(err.ops()) == 1 and err.ops()[0] == macaroonbakery.LOGIN_OP: + if len(err.ops()) == 1 and err.ops()[0] == bakery.LOGIN_OP: name = 'authn' raise _DischargeRequiredError(name=name, m=m) @@ -824,7 +802,7 @@ class _Client(object): max_retries = 3 def __init__(self, dischargers): - self._key = macaroonbakery.generate_key() + self._key = bakery.generate_key() self._macaroons = {} self._dischargers = dischargers @@ -894,26 +872,25 @@ class _Client(object): return ms def _discharge_all(self, ctx, m): - def get_discharge(ctx, cav, pay_load): + def get_discharge(cav, payload): d = self._dischargers.get(cav.location) if d is None: raise ValueError('third party discharger ' '{} not found'.format(cav.location)) - return d.discharge(ctx, cav, pay_load) + return d.discharge(ctx, cav, payload) - return macaroonbakery.discharge_all(ctx, m, get_discharge) + return bakery.discharge_all(m, get_discharge) -class _BasicAuthIdService(macaroonbakery.IdentityClient): +class _BasicAuthIdService(bakery.IdentityClient): def identity_from_context(self, ctx): user, pwd = _basic_auth_from_context(ctx) if user != 'sherlock' or pwd != 'holmes': return None, None - return macaroonbakery.SimpleIdentity(user), None + return bakery.SimpleIdentity(user), None def declared_identity(self, ctx, declared): - raise macaroonbakery.IdentityError('no identity declarations in basic ' - 'auth id service') + raise bakery.IdentityError('no identity declarations in basic auth id service') _BASIC_AUTH_KEY = checkers.ContextKey('user-key') diff --git a/macaroonbakery/tests/test_client.py b/macaroonbakery/tests/test_client.py new file mode 100644 index 0000000..e1a4009 --- /dev/null +++ b/macaroonbakery/tests/test_client.py @@ -0,0 +1,388 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +import base64 +import datetime +import json +from unittest import TestCase +try: + from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler +except ImportError: + from http.server import HTTPServer, BaseHTTPRequestHandler +import threading + +from httmock import ( + HTTMock, + urlmatch +) +import requests +from six.moves.urllib.parse import parse_qs + +import macaroonbakery as bakery +import macaroonbakery.httpbakery as httpbakery +import macaroonbakery.checkers as checkers + +AGES = datetime.datetime.utcnow() + datetime.timedelta(days=1) +TEST_OP = bakery.Op(entity='test', action='test') + + +class TestClient(TestCase): + def test_single_service_first_party(self): + b = new_bakery('loc', None, None) + + def handler(*args): + GetHandler(b, None, None, None, None, *args) + try: + httpd = HTTPServer(('', 0), handler) + thread = threading.Thread(target=httpd.serve_forever) + thread.start() + srv_macaroon = b.oven.macaroon( + version=bakery.LATEST_VERSION, expiry=AGES, + caveats=None, ops=[TEST_OP]) + self.assertEquals(srv_macaroon.macaroon.location, 'loc') + client = httpbakery.Client() + client.cookies.set_cookie(requests.cookies.create_cookie( + 'macaroon-test', base64.b64encode(json.dumps([ + srv_macaroon.to_dict().get('m') + ]).encode('utf-8')).decode('utf-8') + )) + resp = requests.get( + url='http://' + httpd.server_address[0] + ':' + + str(httpd.server_address[1]), + cookies=client.cookies, auth=client.auth()) + resp.raise_for_status() + self.assertEquals(resp.text, 'done') + finally: + httpd.shutdown() + + def test_single_party_with_header(self): + b = new_bakery('loc', None, None) + + def handler(*args): + GetHandler(b, None, None, None, None, *args) + try: + httpd = HTTPServer(('', 0), handler) + thread = threading.Thread(target=httpd.serve_forever) + thread.start() + srv_macaroon = b.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=AGES, caveats=None, ops=[TEST_OP]) + self.assertEquals(srv_macaroon.macaroon.location, 'loc') + headers = { + 'Macaroons': base64.b64encode(json.dumps([ + srv_macaroon.to_dict().get('m') + ]).encode('utf-8')) + } + resp = requests.get( + url='http://' + httpd.server_address[0] + ':' + + str(httpd.server_address[1]), + headers=headers) + resp.raise_for_status() + self.assertEquals(resp.text, 'done') + finally: + httpd.shutdown() + + def test_repeated_request_with_body(self): + class _DischargerLocator(bakery.ThirdPartyLocator): + def __init__(self): + self.key = bakery.generate_key() + + def third_party_info(self, loc): + if loc == 'http://1.2.3.4': + return bakery.ThirdPartyInfo( + public_key=self.key.public_key, + version=bakery.LATEST_VERSION, + ) + + d = _DischargerLocator() + b = new_bakery('loc', d, None) + + @urlmatch(path='.*/discharge') + def discharge(url, request): + qs = parse_qs(request.body) + content = {q: qs[q][0] for q in qs} + m = httpbakery.discharge(checkers.AuthContext(), content, d.key, d, alwaysOK3rd) + return { + 'status_code': 200, + 'content': { + 'Macaroon': m.to_dict() + } + } + + def handler(*args): + GetHandler(b, 'http://1.2.3.4', None, None, None, *args) + try: + httpd = HTTPServer(('', 0), handler) + thread = threading.Thread(target=httpd.serve_forever) + thread.start() + client = httpbakery.Client() + with HTTMock(discharge): + resp = requests.get( + url='http://' + httpd.server_address[0] + ':' + + str(httpd.server_address[1]), + cookies=client.cookies, + auth=client.auth()) + resp.raise_for_status() + self.assertEquals(resp.text, 'done') + finally: + httpd.shutdown() + + def test_too_many_discharge(self): + class _DischargerLocator(bakery.ThirdPartyLocator): + def __init__(self): + self.key = bakery.generate_key() + + def third_party_info(self, loc): + if loc == 'http://1.2.3.4': + return bakery.ThirdPartyInfo( + public_key=self.key.public_key, + version=bakery.LATEST_VERSION, + ) + + d = _DischargerLocator() + b = new_bakery('loc', d, None) + + @urlmatch(path='.*/discharge') + def discharge(url, request): + wrong_macaroon = bakery.Macaroon( + root_key=b'some key', id=b'xxx', + location='some other location', + version=bakery.VERSION_0) + return { + 'status_code': 200, + 'content': { + 'Macaroon': wrong_macaroon.to_dict() + } + } + + def handler(*args): + GetHandler(b, 'http://1.2.3.4', None, None, None, *args) + try: + httpd = HTTPServer(('', 0), handler) + thread = threading.Thread(target=httpd.serve_forever) + thread.start() + client = httpbakery.Client() + with HTTMock(discharge): + with self.assertRaises(httpbakery.BakeryException) as ctx: + requests.get( + url='http://' + httpd.server_address[0] + ':' + + str(httpd.server_address[1]), + cookies=client.cookies, + auth=client.auth()) + self.assertEqual(ctx.exception.args[0], + 'too many (3) discharge requests') + finally: + httpd.shutdown() + + def test_third_party_discharge_refused(self): + class _DischargerLocator(bakery.ThirdPartyLocator): + def __init__(self): + self.key = bakery.generate_key() + + def third_party_info(self, loc): + if loc == 'http://1.2.3.4': + return bakery.ThirdPartyInfo( + public_key=self.key.public_key, + version=bakery.LATEST_VERSION, + ) + + def check(cond, arg): + raise bakery.ThirdPartyCaveatCheckFailed('boo! cond' + cond) + + d = _DischargerLocator() + b = new_bakery('loc', d, None) + + @urlmatch(path='.*/discharge') + def discharge(url, request): + qs = parse_qs(request.body) + content = {q: qs[q][0] for q in qs} + httpbakery.discharge(checkers.AuthContext(), content, d.key, d, + ThirdPartyCaveatCheckerF(check)) + + def handler(*args): + GetHandler(b, 'http://1.2.3.4', None, None, None, *args) + try: + httpd = HTTPServer(('', 0), handler) + thread = threading.Thread(target=httpd.serve_forever) + thread.start() + client = httpbakery.Client() + with HTTMock(discharge): + with self.assertRaises(bakery.ThirdPartyCaveatCheckFailed): + requests.get( + url='http://' + httpd.server_address[0] + ':' + + str(httpd.server_address[1]), + cookies=client.cookies, + auth=client.auth()) + finally: + httpd.shutdown() + + def test_discharge_with_interaction_required_error(self): + class _DischargerLocator(bakery.ThirdPartyLocator): + def __init__(self): + self.key = bakery.generate_key() + + def third_party_info(self, loc): + if loc == 'http://1.2.3.4': + return bakery.ThirdPartyInfo( + public_key=self.key.public_key, + version=bakery.LATEST_VERSION, + ) + d = _DischargerLocator() + b = new_bakery('loc', d, None) + + @urlmatch(path='.*/discharge') + def discharge(url, request): + return { + 'status_code': 401, + 'content': { + 'Code': httpbakery.ERR_INTERACTION_REQUIRED, + 'Message': 'interaction required', + 'Info': { + 'WaitURL': 'http://0.1.2.3/', + 'VisitURL': 'http://0.1.2.3/', + }, + } + } + + def handler(*args): + GetHandler(b, 'http://1.2.3.4', None, None, None, *args) + + try: + httpd = HTTPServer(('', 0), handler) + thread = threading.Thread(target=httpd.serve_forever) + thread.start() + + class MyInteractor(httpbakery.LegacyInteractor): + def legacy_interact(self, ctx, location, visit_url): + raise httpbakery.InteractionError('cannot visit') + + def interact(self, ctx, location, interaction_required_err): + pass + + def kind(self): + return httpbakery.WEB_BROWSER_INTERACTION_KIND + + client = httpbakery.Client(interaction_methods=[MyInteractor()]) + + with HTTMock(discharge): + with self.assertRaises(httpbakery.InteractionError): + requests.get( + 'http://' + httpd.server_address[0] + ':' + str( + httpd.server_address[1]), + cookies=client.cookies, + auth=client.auth()) + finally: + httpd.shutdown() + + +class GetHandler(BaseHTTPRequestHandler): + '''A mock HTTP server that serves a GET request''' + def __init__(self, bakery, auth_location, mutate_error, + caveats, version, *args): + ''' + @param bakery used to check incoming requests and macaroons + for discharge-required errors. + @param auth_location holds the location of any 3rd party + authorizer. If this is not None, a 3rd party caveat will be + added addressed to this location. + @param mutate_error if non None, will be called with any + discharge-required error before responding to the client. + @param caveats called to get caveats to add to the returned + macaroon. + @param holds the version of the bakery that the + server will purport to serve. + ''' + self._bakery = bakery + self._auth_location = auth_location + self._mutate_error = mutate_error + self._caveats = caveats + self._server_version = version + BaseHTTPRequestHandler.__init__(self, *args) + + def do_GET(self): + '''do_GET implements a handler for the HTTP GET method''' + ctx = checkers.AuthContext() + auth_checker = self._bakery.checker.auth( + httpbakery.extract_macaroons(self.headers)) + try: + auth_checker.allow(ctx, [TEST_OP]) + except (bakery.PermissionDenied, + bakery.VerificationError) as exc: + return self._write_discharge_error(exc) + self.send_response(200) + self.end_headers() + content_len = int(self.headers.get('content-length', 0)) + content = 'done' + if self.path != '/no-body'and content_len > 0: + body = self.rfile.read(content_len) + content = content + ' ' + body + self.wfile.write(content.encode('utf-8')) + return + + def _write_discharge_error(self, exc): + version = httpbakery.request_version(self.headers) + if version < bakery.LATEST_VERSION: + self._server_version = version + + caveats = [] + if self._auth_location != '': + caveats = [ + checkers.Caveat(location=self._auth_location, + condition='is-ok') + ] + if self._caveats is not None: + caveats.extend(self._caveats) + + m = self._bakery.oven.macaroon( + version=bakery.LATEST_VERSION, expiry=AGES, + caveats=caveats, ops=[TEST_OP]) + + content, headers = httpbakery.discharge_required_response( + m, '/', 'test', exc.args[0]) + self.send_response(401) + for h in headers: + self.send_header(h, headers[h]) + self.send_header('Connection', 'close') + self.end_headers() + self.wfile.write(content) + + +def new_bakery(location, locator, checker): + '''Return a new bakery instance. + @param location Location of the bakery {str}. + @param locator Locator for third parties {ThirdPartyLocator or None} + @param checker Caveat checker {FirstPartyCaveatChecker or None} + @return {Bakery} + ''' + if checker is None: + c = checkers.Checker() + c.namespace().register('testns', '') + c.register('is', 'testns', check_is_something) + checker = c + key = bakery.generate_key() + return bakery.Bakery( + location=location, + locator=locator, + key=key, + checker=checker, + ) + + +def is_something_caveat(): + return checkers.Caveat(condition='is something', namespace='testns') + + +def check_is_something(ctx, cond, arg): + if arg != 'something': + return '{} doesn\'t match "something"'.format(arg) + return None + + +class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker): + def __init__(self, check): + self._check = check + + def check_third_party_caveat(self, ctx, info): + cond, arg = checkers.parse_caveat(info.condition) + return self._check(cond, arg) + +alwaysOK3rd = ThirdPartyCaveatCheckerF(lambda cond, arg: []) diff --git a/macaroonbakery/tests/test_codec.py b/macaroonbakery/tests/test_codec.py index 6573266..d4fbc57 100644 --- a/macaroonbakery/tests/test_codec.py +++ b/macaroonbakery/tests/test_codec.py @@ -6,92 +6,94 @@ from unittest import TestCase import nacl.public import six -import macaroonbakery -from macaroonbakery import utils +import macaroonbakery as bakery from macaroonbakery import codec import macaroonbakery.checkers as checkers class TestCodec(TestCase): def setUp(self): - self.fp_key = macaroonbakery.generate_key() - self.tp_key = macaroonbakery.generate_key() + self.fp_key = bakery.generate_key() + self.tp_key = bakery.generate_key() def test_v1_round_trip(self): - tp_info = macaroonbakery.ThirdPartyInfo( - version=macaroonbakery.BAKERY_V1, + tp_info = bakery.ThirdPartyInfo( + version=bakery.VERSION_1, public_key=self.tp_key.public_key) - cid = macaroonbakery.encode_caveat( + cid = bakery.encode_caveat( 'is-authenticated-user', b'a random string', tp_info, self.fp_key, None) - res = macaroonbakery.decode_caveat(self.tp_key, cid) - self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo( + res = bakery.decode_caveat(self.tp_key, cid) + self.assertEquals(res, bakery.ThirdPartyCaveatInfo( first_party_public_key=self.fp_key.public_key, root_key=b'a random string', condition='is-authenticated-user', caveat=cid, third_party_key_pair=self.tp_key, - version=macaroonbakery.BAKERY_V1, - namespace=macaroonbakery.legacy_namespace() + version=bakery.VERSION_1, + id=None, + namespace=bakery.legacy_namespace() )) def test_v2_round_trip(self): - tp_info = macaroonbakery.ThirdPartyInfo( - version=macaroonbakery.BAKERY_V2, + tp_info = bakery.ThirdPartyInfo( + version=bakery.VERSION_2, public_key=self.tp_key.public_key) - cid = macaroonbakery.encode_caveat( + cid = bakery.encode_caveat( 'is-authenticated-user', b'a random string', tp_info, self.fp_key, None) - res = macaroonbakery.decode_caveat(self.tp_key, cid) - self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo( + res = bakery.decode_caveat(self.tp_key, cid) + self.assertEquals(res, bakery.ThirdPartyCaveatInfo( first_party_public_key=self.fp_key.public_key, root_key=b'a random string', condition='is-authenticated-user', caveat=cid, third_party_key_pair=self.tp_key, - version=macaroonbakery.BAKERY_V2, - namespace=macaroonbakery.legacy_namespace() + version=bakery.VERSION_2, + id=None, + namespace=bakery.legacy_namespace() )) def test_v3_round_trip(self): - tp_info = macaroonbakery.ThirdPartyInfo( - version=macaroonbakery.BAKERY_V3, + tp_info = bakery.ThirdPartyInfo( + version=bakery.VERSION_3, public_key=self.tp_key.public_key) ns = checkers.Namespace() ns.register('testns', 'x') - cid = macaroonbakery.encode_caveat( + cid = bakery.encode_caveat( 'is-authenticated-user', b'a random string', tp_info, self.fp_key, ns) - res = macaroonbakery.decode_caveat(self.tp_key, cid) - self.assertEquals(res, macaroonbakery.ThirdPartyCaveatInfo( + res = bakery.decode_caveat(self.tp_key, cid) + self.assertEquals(res, bakery.ThirdPartyCaveatInfo( first_party_public_key=self.fp_key.public_key, root_key=b'a random string', condition='is-authenticated-user', caveat=cid, third_party_key_pair=self.tp_key, - version=macaroonbakery.BAKERY_V3, + version=bakery.VERSION_3, + id=None, namespace=ns )) def test_empty_caveat_id(self): - with self.assertRaises(macaroonbakery.VerificationError) as context: - macaroonbakery.decode_caveat(self.tp_key, b'') + with self.assertRaises(bakery.VerificationError) as context: + bakery.decode_caveat(self.tp_key, b'') self.assertTrue('empty third party caveat' in str(context.exception)) def test_decode_caveat_v1_from_go(self): - tp_key = macaroonbakery.PrivateKey( + tp_key = bakery.PrivateKey( nacl.public.PrivateKey(base64.b64decode( 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))) - fp_key = macaroonbakery.PrivateKey( + fp_key = bakery.PrivateKey( nacl.public.PrivateKey(base64.b64decode( 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))) root_key = base64.b64decode('vDxEmWZEkgiNEFlJ+8ruXe3qDSLf1H+o') @@ -108,67 +110,70 @@ class TestCodec(TestCase): 'BORldUUExGdjVla1dWUjA4Uk1sbGJhc3c4VGdFbkhzM0laeVo' '0V2lEOHhRUWdjU3ljOHY4eUt4dEhxejVEczJOYmh1ZDJhUFdt' 'UTVMcVlNWitmZ2FNaTAxdE9DIn0=') - cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav) - self.assertEquals(cav, macaroonbakery.ThirdPartyCaveatInfo( + cav = bakery.decode_caveat(tp_key, encrypted_cav) + self.assertEquals(cav, bakery.ThirdPartyCaveatInfo( condition='caveat condition', first_party_public_key=fp_key.public_key, third_party_key_pair=tp_key, root_key=root_key, caveat=encrypted_cav, - version=macaroonbakery.BAKERY_V1, - namespace=macaroonbakery.legacy_namespace() + version=bakery.VERSION_1, + id=None, + namespace=bakery.legacy_namespace() )) def test_decode_caveat_v2_from_go(self): - tp_key = macaroonbakery.PrivateKey(nacl.public.PrivateKey( + tp_key = bakery.PrivateKey(nacl.public.PrivateKey( base64.b64decode( 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))) - fp_key = macaroonbakery.PrivateKey( + fp_key = bakery.PrivateKey( nacl.public.PrivateKey(base64.b64decode( 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))) root_key = base64.b64decode('wh0HSM65wWHOIxoGjgJJOFvQKn2jJFhC') # This caveat has been generated from the go code # to check the compatibilty - encrypted_cav = base64.urlsafe_b64decode( - utils.add_base64_padding(six.b( - 'AvD-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGIHq9xGcHS9IZ' - 'Lh0cL6D9qpeKI0mXmCPfnwRQDuVYC8y5gVWd-oCGZaj5TGtk3byp2Vnw6ojmt' - 'sULDhY59YA_J_Y0ATkERO5T9ajoRWBxU2OXBoX6bImXA'))) - cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav) - self.assertEqual(cav, macaroonbakery.ThirdPartyCaveatInfo( + encrypted_cav = bakery.b64decode( + 'AvD-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGIHq9xGcHS9IZ' + 'Lh0cL6D9qpeKI0mXmCPfnwRQDuVYC8y5gVWd-oCGZaj5TGtk3byp2Vnw6ojmt' + 'sULDhY59YA_J_Y0ATkERO5T9ajoRWBxU2OXBoX6bImXA', + ) + cav = bakery.decode_caveat(tp_key, encrypted_cav) + self.assertEqual(cav, bakery.ThirdPartyCaveatInfo( condition='third party condition', first_party_public_key=fp_key.public_key, third_party_key_pair=tp_key, root_key=root_key, caveat=encrypted_cav, - version=macaroonbakery.BAKERY_V2, - namespace=macaroonbakery.legacy_namespace() + version=bakery.VERSION_2, + id=None, + namespace=bakery.legacy_namespace() )) def test_decode_caveat_v3_from_go(self): - tp_key = macaroonbakery.PrivateKey( + tp_key = bakery.PrivateKey( nacl.public.PrivateKey(base64.b64decode( 'TSpvLpQkRj+T3JXnsW2n43n5zP/0X4zn0RvDiWC3IJ0='))) - fp_key = macaroonbakery.PrivateKey(nacl.public.PrivateKey( + fp_key = bakery.PrivateKey(nacl.public.PrivateKey( base64.b64decode( 'KXpsoJ9ujZYi/O2Cca6kaWh65MSawzy79LWkrjOfzcs='))) root_key = base64.b64decode(b'oqOXI3/Mz/pKjCuFOt2eYxb7ndLq66GY') # This caveat has been generated from the go code # to check the compatibilty - encrypted_cav = base64.urlsafe_b64decode( - utils.add_base64_padding(six.b( - 'A_D-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGNLeFSkD2M-8A' - 'EYvmgVH95GWu7T7caKxKhhOQFcEKgnXKJvYXxz1zin4cZc4Q6C7gVqA-J4_j3' - '1LX4VKxymqG62UGPo78wOv0_fKjr3OI6PPJOYOQgBMclemlRF2'))) - cav = macaroonbakery.decode_caveat(tp_key, encrypted_cav) - self.assertEquals(cav, macaroonbakery.ThirdPartyCaveatInfo( + encrypted_cav = bakery.b64decode( + 'A_D-xlUf2MdGMgtu7OKRQnCP1OQJk6PKeFWRK26WIBA6DNwKGNLeFSkD2M-8A' + 'EYvmgVH95GWu7T7caKxKhhOQFcEKgnXKJvYXxz1zin4cZc4Q6C7gVqA-J4_j3' + '1LX4VKxymqG62UGPo78wOv0_fKjr3OI6PPJOYOQgBMclemlRF2', + ) + cav = bakery.decode_caveat(tp_key, encrypted_cav) + self.assertEquals(cav, bakery.ThirdPartyCaveatInfo( condition='third party condition', first_party_public_key=fp_key.public_key, third_party_key_pair=tp_key, root_key=root_key, caveat=encrypted_cav, - version=macaroonbakery.BAKERY_V3, - namespace=macaroonbakery.legacy_namespace() + version=bakery.VERSION_3, + id=None, + namespace=bakery.legacy_namespace() )) def test_encode_decode_varint(self): @@ -183,7 +188,7 @@ class TestCodec(TestCase): for test in tests: data = bytearray() expected = bytearray() - macaroonbakery.encode_uvarint(test[0], data) + bakery.encode_uvarint(test[0], data) for v in test[1]: expected.append(v) self.assertEquals(data, expected) diff --git a/macaroonbakery/tests/test_discharge.py b/macaroonbakery/tests/test_discharge.py index 6e2df6a..433483a 100644 --- a/macaroonbakery/tests/test_discharge.py +++ b/macaroonbakery/tests/test_discharge.py @@ -3,11 +3,8 @@ import unittest from pymacaroons import MACAROON_V1, Macaroon -from pymacaroons.exceptions import ( - MacaroonInvalidSignatureException, MacaroonUnmetCaveatException -) -import macaroonbakery +import macaroonbakery as bakery import macaroonbakery.checkers as checkers from macaroonbakery.tests import common @@ -20,15 +17,15 @@ class TestDischarge(unittest.TestCase): can verify this macaroon as valid. ''' oc = common.new_bakery('bakerytest') - primary = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + primary = oc.oven.macaroon(bakery.LATEST_VERSION, common.ages, None, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) self.assertEqual(primary.macaroon.location, 'bakerytest') primary.add_caveat(checkers.Caveat(condition='str something', namespace='testns'), oc.oven.key, oc.oven.locator) oc.checker.auth([[primary.macaroon]]).allow( - common.str_context('something'), [macaroonbakery.LOGIN_OP]) + common.str_context('something'), [bakery.LOGIN_OP]) def test_macaroon_paper_fig6(self): ''' Implements an example flow as described in the macaroons paper: @@ -45,15 +42,15 @@ class TestDischarge(unittest.TestCase): The target service verifies the original macaroon it delegated to fs No direct contact between bs and ts is required ''' - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) ts = common.new_bakery('ts-loc', locator) fs = common.new_bakery('fs-loc', locator) # ts creates a macaroon. - ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION, common.ages, - None, [macaroonbakery.LOGIN_OP]) + None, [bakery.LOGIN_OP]) # ts somehow sends the macaroon to fs which adds a third party caveat # to be discharged by bs. @@ -62,51 +59,55 @@ class TestDischarge(unittest.TestCase): fs.oven.key, fs.oven.locator) # client asks for a discharge macaroon for each third party caveat - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): self.assertEqual(cav.location, 'bs-loc') - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - bs.oven.key, - common.ThirdPartyStrcmpChecker( - 'user==bob'), - bs.oven.locator) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker('user==bob'), + bs.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, - get_discharge) + d = bakery.discharge_all(ts_macaroon, get_discharge) ts.checker.auth([d]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) def test_discharge_with_version1_macaroon(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) ts = common.new_bakery('ts-loc', locator) # ts creates a old-version macaroon. - ts_macaroon = ts.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages, - None, [macaroonbakery.LOGIN_OP]) + ts_macaroon = ts.oven.macaroon(bakery.VERSION_1, common.ages, + None, [bakery.LOGIN_OP]) ts_macaroon.add_caveat(checkers.Caveat(condition='something', location='bs-loc'), ts.oven.key, ts.oven.locator) # client asks for a discharge macaroon for each third party caveat - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): # Make sure that the caveat id really is old-style. try: cav.caveat_id_bytes.decode('utf-8') except UnicodeDecodeError: self.fail('caveat id is not utf-8') - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - bs.oven.key, - common.ThirdPartyStrcmpChecker( - 'something'), - bs.oven.locator) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker('something'), + bs.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, - get_discharge) + d = bakery.discharge_all(ts_macaroon, get_discharge) ts.checker.auth([d]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) for m in d: self.assertEqual(m.version, MACAROON_V1) @@ -114,29 +115,31 @@ class TestDischarge(unittest.TestCase): def test_version1_macaroon_id(self): # In the version 1 bakery, macaroon ids were hex-encoded with a # hyphenated UUID suffix. - root_key_store = macaroonbakery.MemoryKeyStore() - b = macaroonbakery.Bakery(root_key_store=root_key_store, - identity_client=common.OneIdentity()) + root_key_store = bakery.MemoryKeyStore() + b = bakery.Bakery( + root_key_store=root_key_store, + identity_client=common.OneIdentity(), + ) key, id = root_key_store.root_key() root_key_store.get(id) m = Macaroon(key=key, version=MACAROON_V1, location='', identifier=id + b'-deadl00f') b.checker.auth([[m]]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) def test_macaroon_paper_fig6_fails_without_discharges(self): ''' Runs a similar test as test_macaroon_paper_fig6 without the client discharging the third party caveats. ''' - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() ts = common.new_bakery('ts-loc', locator) fs = common.new_bakery('fs-loc', locator) common.new_bakery('as-loc', locator) # ts creates a macaroon. - ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION, common.ages, None, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) # ts somehow sends the macaroon to fs which adds a third party # caveat to be discharged by as. @@ -148,24 +151,24 @@ class TestDischarge(unittest.TestCase): try: ts.checker.auth([[ts_macaroon.macaroon]]).allow( common.test_context, - macaroonbakery.LOGIN_OP + bakery.LOGIN_OP ) self.fail('macaroon unmet should be raised') - except MacaroonUnmetCaveatException: + except bakery.VerificationError: pass def test_macaroon_paper_fig6_fails_with_binding_on_tampered_sig(self): ''' Runs a similar test as test_macaroon_paper_fig6 with the discharge macaroon binding being done on a tampered signature. ''' - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) ts = common.new_bakery('ts-loc', locator) # ts creates a macaroon. - ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION, common.ages, None, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) # ts somehow sends the macaroon to fs which adds a third party caveat # to be discharged by as. ts_macaroon.add_caveat(checkers.Caveat(condition='user==bob', @@ -173,16 +176,18 @@ class TestDischarge(unittest.TestCase): ts.oven.key, ts.oven.locator) # client asks for a discharge macaroon for each third party caveat - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): self.assertEqual(cav.location, 'bs-loc') - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - bs.oven.key, - common.ThirdPartyStrcmpChecker( - 'user==bob'), - bs.oven.locator) - - d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, - get_discharge) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + bs.oven.key, + common.ThirdPartyStrcmpChecker('user==bob'), + bs.oven.locator, + ) + + d = bakery.discharge_all(ts_macaroon, get_discharge) # client has all the discharge macaroons. For each discharge macaroon # bind it to our ts_macaroon and add it to our request. tampered_macaroon = Macaroon() @@ -190,35 +195,39 @@ class TestDischarge(unittest.TestCase): d[i + 1] = tampered_macaroon.prepare_for_request(dm) # client makes request to ts. - with self.assertRaises(MacaroonInvalidSignatureException) as exc: + with self.assertRaises(bakery.VerificationError) as exc: ts.checker.auth([d]).allow(common.test_context, - macaroonbakery.LOGIN_OP) - self.assertEqual('Signatures do not match', exc.exception.args[0]) + bakery.LOGIN_OP) + self.assertEqual('verification failed: Signatures do not match', + exc.exception.args[0]) def test_need_declared(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() first_party = common.new_bakery('first', locator) third_party = common.new_bakery('third', locator) # firstParty mints a macaroon with a third-party caveat addressed # to thirdParty with a need-declared caveat. m = first_party.oven.macaroon( - macaroonbakery.LATEST_BAKERY_VERSION, common.ages, [ + bakery.LATEST_VERSION, common.ages, [ checkers.need_declared_caveat( checkers.Caveat(location='third', condition='something'), ['foo', 'bar'] ) - ], [macaroonbakery.LOGIN_OP]) + ], [bakery.LOGIN_OP]) # The client asks for a discharge macaroon for each third party caveat. - def get_discharge(ctx, cav, payload): - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - third_party.oven.key, - common.ThirdPartyStrcmpChecker( - 'something'), - third_party.oven.locator) + def get_discharge(cav, payload): + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, + common.ThirdPartyStrcmpChecker('something'), + third_party.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) # The required declared attributes should have been added # to the discharge macaroons. @@ -231,22 +240,26 @@ class TestDischarge(unittest.TestCase): # Make sure the macaroons actually check out correctly # when provided with the declared checker. ctx = checkers.context_with_declared(common.test_context, declared) - first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP]) # Try again when the third party does add a required declaration. # The client asks for a discharge macaroon for each third party caveat. - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): checker = common.ThirdPartyCheckerWithCaveats([ checkers.declared_caveat('foo', 'a'), checkers.declared_caveat('arble', 'b') ]) - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - third_party.oven.key, - checker, - third_party.oven.locator) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, + checker, + third_party.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) # One attribute should have been added, the other was already there. declared = checkers.infer_declared(d, first_party.checker.namespace()) @@ -257,25 +270,28 @@ class TestDischarge(unittest.TestCase): }) ctx = checkers.context_with_declared(common.test_context, declared) - first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP]) # Try again, but this time pretend a client is sneakily trying # to add another 'declared' attribute to alter the declarations. - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): checker = common.ThirdPartyCheckerWithCaveats([ checkers.declared_caveat('foo', 'a'), checkers.declared_caveat('arble', 'b'), ]) # Sneaky client adds a first party caveat. - m = macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - third_party.oven.key, checker, - third_party.oven.locator) + m = bakery.discharge( + common.test_context, cav.caveat_id_bytes, + payload, + third_party.oven.key, checker, + third_party.oven.locator, + ) m.add_caveat(checkers.declared_caveat('foo', 'c'), None, None) return m - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) declared = checkers.infer_declared(d, first_party.checker.namespace()) self.assertEqual(declared, { @@ -283,22 +299,22 @@ class TestDischarge(unittest.TestCase): 'arble': 'b', }) - with self.assertRaises(macaroonbakery.AuthInitError) as exc: + with self.assertRaises(bakery.AuthInitError) as exc: first_party.checker.auth([d]).allow(common.test_context, - macaroonbakery.LOGIN_OP) + bakery.LOGIN_OP) self.assertEqual('cannot authorize login macaroon: caveat ' '"declared foo a" not satisfied: got foo=null, ' 'expected "a"', exc.exception.args[0]) def test_discharge_two_need_declared(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() first_party = common.new_bakery('first', locator) third_party = common.new_bakery('third', locator) # first_party mints a macaroon with two third party caveats # with overlapping attributes. m = first_party.oven.macaroon( - macaroonbakery.LATEST_BAKERY_VERSION, + bakery.LATEST_VERSION, common.ages, [ checkers.need_declared_caveat( checkers.Caveat(location='third', condition='x'), @@ -306,18 +322,22 @@ class TestDischarge(unittest.TestCase): checkers.need_declared_caveat( checkers.Caveat(location='third', condition='y'), ['bar', 'baz']), - ], [macaroonbakery.LOGIN_OP]) + ], [bakery.LOGIN_OP]) # The client asks for a discharge macaroon for each third party caveat. # Since no declarations are added by the discharger, - def get_discharge(ctx, cav, payload): - return macaroonbakery.discharge( - ctx, cav.caveat_id_bytes, payload, third_party.oven.key, + def get_discharge(cav, payload): + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, common.ThirdPartyCaveatCheckerEmpty(), - third_party.oven.locator) + third_party.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) declared = checkers.infer_declared(d, first_party.checker.namespace()) self.assertEqual(declared, { 'foo': '', @@ -325,12 +345,12 @@ class TestDischarge(unittest.TestCase): 'baz': '', }) ctx = checkers.context_with_declared(common.test_context, declared) - first_party.checker.auth([d]).allow(ctx, [macaroonbakery.LOGIN_OP]) + first_party.checker.auth([d]).allow(ctx, [bakery.LOGIN_OP]) # If they return conflicting values, the discharge fails. # The client asks for a discharge macaroon for each third party caveat. # Since no declarations are added by the discharger, - class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker): + class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker): def check_third_party_caveat(self, ctx, cav_info): if cav_info.condition == b'x': return [checkers.declared_caveat('foo', 'fooval1')] @@ -341,62 +361,70 @@ class TestDischarge(unittest.TestCase): ] raise common.ThirdPartyCaveatCheckFailed('not matched') - def get_discharge(ctx, cav, payload): - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - third_party.oven.key, - ThirdPartyCaveatCheckerF(), - third_party.oven.locator) + def get_discharge(cav, payload): + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, + ThirdPartyCaveatCheckerF(), + third_party.oven.locator, + ) - d = macaroonbakery.discharge_all(common.test_context, m, get_discharge) + d = bakery.discharge_all(m, get_discharge) declared = checkers.infer_declared(d, first_party.checker.namespace()) self.assertEqual(declared, { 'bar': '', 'baz': 'bazval', }) - with self.assertRaises(macaroonbakery.AuthInitError) as exc: + with self.assertRaises(bakery.AuthInitError) as exc: first_party.checker.auth([d]).allow(common.test_context, - macaroonbakery.LOGIN_OP) + bakery.LOGIN_OP) self.assertEqual('cannot authorize login macaroon: caveat "declared ' 'foo fooval1" not satisfied: got foo=null, expected ' '"fooval1"', exc.exception.args[0]) def test_discharge_macaroon_cannot_be_used_as_normal_macaroon(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() first_party = common.new_bakery('first', locator) third_party = common.new_bakery('third', locator) # First party mints a macaroon with a 3rd party caveat. - m = first_party.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + m = first_party.oven.macaroon(bakery.LATEST_VERSION, common.ages, [ checkers.Caveat(location='third', condition='true')], - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) # Acquire the discharge macaroon, but don't bind it to the original. class M: unbound = None - def get_discharge(ctx, cav, payload): - m = macaroonbakery.discharge( - ctx, cav.caveat_id_bytes, payload, third_party.oven.key, + def get_discharge(cav, payload): + m = bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + third_party.oven.key, common.ThirdPartyStrcmpChecker('true'), - third_party.oven.locator) + third_party.oven.locator, + ) M.unbound = m.macaroon.copy() return m - macaroonbakery.discharge_all(common.test_context, m, get_discharge) + bakery.discharge_all(m, get_discharge) self.assertIsNotNone(M.unbound) # Make sure it cannot be used as a normal macaroon in the third party. - with self.assertRaises(macaroonbakery.AuthInitError) as exc: + with self.assertRaises(bakery.VerificationError) as exc: third_party.checker.auth([[M.unbound]]).allow( - common.test_context, [macaroonbakery.LOGIN_OP]) + common.test_context, [bakery.LOGIN_OP]) self.assertEqual('no operations found in macaroon', exc.exception.args[0]) def test_third_party_discharge_macaroon_ids_are_small(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bakeries = { 'ts-loc': common.new_bakery('ts-loc', locator), 'as1-loc': common.new_bakery('as1-loc', locator), @@ -404,14 +432,14 @@ class TestDischarge(unittest.TestCase): } ts = bakeries['ts-loc'] - ts_macaroon = ts.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, + ts_macaroon = ts.oven.macaroon(bakery.LATEST_VERSION, common.ages, - None, [macaroonbakery.LOGIN_OP]) + None, [bakery.LOGIN_OP]) ts_macaroon.add_caveat(checkers.Caveat(condition='something', location='as1-loc'), ts.oven.key, ts.oven.locator) - class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker): + class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker): def __init__(self, loc): self._loc = loc @@ -424,18 +452,20 @@ class TestDischarge(unittest.TestCase): raise common.ThirdPartyCaveatCheckFailed( 'unknown location {}'.format(self._loc)) - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): oven = bakeries[cav.location].oven - return macaroonbakery.discharge(ctx, cav.caveat_id_bytes, payload, - oven.key, - ThirdPartyCaveatCheckerF( - cav.location), - oven.locator) - - d = macaroonbakery.discharge_all(common.test_context, ts_macaroon, - get_discharge) + return bakery.discharge( + common.test_context, + cav.caveat_id_bytes, + payload, + oven.key, + ThirdPartyCaveatCheckerF(cav.location), + oven.locator, + ) + + d = bakery.discharge_all(ts_macaroon, get_discharge) ts.checker.auth([d]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) for i, m in enumerate(d): for j, cav in enumerate(m.caveats): diff --git a/macaroonbakery/tests/test_discharge_all.py b/macaroonbakery/tests/test_discharge_all.py index 8da8823..7999f5f 100644 --- a/macaroonbakery/tests/test_discharge_all.py +++ b/macaroonbakery/tests/test_discharge_all.py @@ -4,7 +4,7 @@ import unittest from pymacaroons.verifier import Verifier -import macaroonbakery +import macaroonbakery as bakery import macaroonbakery.checkers as checkers from macaroonbakery.tests import common @@ -16,12 +16,11 @@ def always_ok(predicate): class TestDischargeAll(unittest.TestCase): def test_discharge_all_no_discharges(self): root_key = b'root key' - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key=root_key, id=b'id0', location='loc0', - version=macaroonbakery.LATEST_BAKERY_VERSION, + version=bakery.LATEST_VERSION, namespace=common.test_checker().namespace()) - ms = macaroonbakery.discharge_all( - common.test_context, m, no_discharge(self)) + ms = bakery.discharge_all(m, no_discharge(self)) self.assertEqual(len(ms), 1) self.assertEqual(ms[0], m.macaroon) v = Verifier() @@ -30,9 +29,9 @@ class TestDischargeAll(unittest.TestCase): def test_discharge_all_many_discharges(self): root_key = b'root key' - m0 = macaroonbakery.Macaroon( + m0 = bakery.Macaroon( root_key=root_key, id=b'id0', location='loc0', - version=macaroonbakery.LATEST_BAKERY_VERSION) + version=bakery.LATEST_VERSION) class State(object): total_required = 40 @@ -52,19 +51,18 @@ class TestDischargeAll(unittest.TestCase): add_caveats(m0) - def get_discharge(_, cav, payload): + def get_discharge(cav, payload): self.assertEqual(payload, None) - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key='root key {}'.format( cav.caveat_id.decode('utf-8')).encode('utf-8'), id=cav.caveat_id, location='', - version=macaroonbakery.LATEST_BAKERY_VERSION) + version=bakery.LATEST_VERSION) add_caveats(m) return m - ms = macaroonbakery.discharge_all( - common.test_context, m0, get_discharge) + ms = bakery.discharge_all(m0, get_discharge) self.assertEqual(len(ms), 41) @@ -77,7 +75,7 @@ class TestDischargeAll(unittest.TestCase): # we're using actual third party caveats as added by # Macaroon.add_caveat and we use a larger number of caveats # so that caveat ids will need to get larger. - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bakeries = {} total_discharges_required = 40 @@ -106,9 +104,9 @@ class TestDischargeAll(unittest.TestCase): return caveats root_key = b'root key' - m0 = macaroonbakery.Macaroon( + m0 = bakery.Macaroon( root_key=root_key, id=b'id0', location='ts-loc', - version=macaroonbakery.LATEST_BAKERY_VERSION) + version=bakery.LATEST_VERSION) m0.add_caveat(checkers. Caveat(location=add_bakery(), condition='something'), @@ -117,18 +115,17 @@ class TestDischargeAll(unittest.TestCase): # We've added a caveat (the first) so one less caveat is required. M.still_required -= 1 - class ThirdPartyCaveatCheckerF(macaroonbakery.ThirdPartyCaveatChecker): + class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker): def check_third_party_caveat(self, ctx, info): return checker(ctx, info) - def get_discharge(ctx, cav, payload): - return macaroonbakery.discharge( - ctx, cav.caveat_id, payload, + def get_discharge(cav, payload): + return bakery.discharge( + common.test_context, cav.caveat_id, payload, bakeries[cav.location].oven.key, ThirdPartyCaveatCheckerF(), locator) - ms = macaroonbakery.discharge_all(common.test_context, m0, - get_discharge) + ms = bakery.discharge_all(m0, get_discharge) self.assertEqual(len(ms), total_discharges_required + 1) @@ -138,33 +135,31 @@ class TestDischargeAll(unittest.TestCase): def test_discharge_all_local_discharge(self): oc = common.new_bakery('ts', None) - client_key = macaroonbakery.generate_key() - m = oc.oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, common.ages, + client_key = bakery.generate_key() + m = oc.oven.macaroon(bakery.LATEST_VERSION, common.ages, [ - macaroonbakery.local_third_party_caveat( + bakery.local_third_party_caveat( client_key.public_key, - macaroonbakery.LATEST_BAKERY_VERSION) - ], [macaroonbakery.LOGIN_OP]) - ms = macaroonbakery.discharge_all( - common.test_context, m, no_discharge(self), client_key) + bakery.LATEST_VERSION) + ], [bakery.LOGIN_OP]) + ms = bakery.discharge_all(m, no_discharge(self), client_key) oc.checker.auth([ms]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) def test_discharge_all_local_discharge_version1(self): oc = common.new_bakery('ts', None) - client_key = macaroonbakery.generate_key() - m = oc.oven.macaroon(macaroonbakery.BAKERY_V1, common.ages, [ - macaroonbakery.local_third_party_caveat( - client_key.public_key, macaroonbakery.BAKERY_V1) - ], [macaroonbakery.LOGIN_OP]) - ms = macaroonbakery.discharge_all( - common.test_context, m, no_discharge(self), client_key) + client_key = bakery.generate_key() + m = oc.oven.macaroon(bakery.VERSION_1, common.ages, [ + bakery.local_third_party_caveat( + client_key.public_key, bakery.VERSION_1) + ], [bakery.LOGIN_OP]) + ms = bakery.discharge_all(m, no_discharge(self), client_key) oc.checker.auth([ms]).allow(common.test_context, - [macaroonbakery.LOGIN_OP]) + [bakery.LOGIN_OP]) def no_discharge(test): - def get_discharge(ctx, cav, payload): + def get_discharge(cav, payload): test.fail("get_discharge called unexpectedly") return get_discharge diff --git a/macaroonbakery/tests/test_keyring.py b/macaroonbakery/tests/test_keyring.py index 351b144..438ab1b 100644 --- a/macaroonbakery/tests/test_keyring.py +++ b/macaroonbakery/tests/test_keyring.py @@ -4,28 +4,28 @@ import unittest from httmock import urlmatch, HTTMock -import macaroonbakery -from macaroonbakery import httpbakery +import macaroonbakery as bakery +import macaroonbakery.httpbakery as httpbakery class TestKeyRing(unittest.TestCase): def test_cache_fetch(self): - key = macaroonbakery.generate_key() + key = bakery.generate_key() @urlmatch(path='.*/discharge/info') def discharge_info(url, request): return { 'status_code': 200, 'content': { - 'Version': macaroonbakery.LATEST_BAKERY_VERSION, + 'Version': bakery.LATEST_VERSION, 'PublicKey': key.public_key.encode().decode('utf-8') } } - expectInfo = macaroonbakery.ThirdPartyInfo( + expectInfo = bakery.ThirdPartyInfo( public_key=key.public_key, - version=macaroonbakery.LATEST_BAKERY_VERSION + version=bakery.LATEST_VERSION ) kr = httpbakery.ThirdPartyLocator(allow_insecure=True) with HTTMock(discharge_info): @@ -33,21 +33,21 @@ class TestKeyRing(unittest.TestCase): self.assertEqual(info, expectInfo) def test_cache_norefetch(self): - key = macaroonbakery.generate_key() + key = bakery.generate_key() @urlmatch(path='.*/discharge/info') def discharge_info(url, request): return { 'status_code': 200, 'content': { - 'Version': macaroonbakery.LATEST_BAKERY_VERSION, + 'Version': bakery.LATEST_VERSION, 'PublicKey': key.public_key.encode().decode('utf-8') } } - expectInfo = macaroonbakery.ThirdPartyInfo( + expectInfo = bakery.ThirdPartyInfo( public_key=key.public_key, - version=macaroonbakery.LATEST_BAKERY_VERSION + version=bakery.LATEST_VERSION ) kr = httpbakery.ThirdPartyLocator(allow_insecure=True) with HTTMock(discharge_info): @@ -57,7 +57,7 @@ class TestKeyRing(unittest.TestCase): self.assertEqual(info, expectInfo) def test_cache_fetch_no_version(self): - key = macaroonbakery.generate_key() + key = bakery.generate_key() @urlmatch(path='.*/discharge/info') def discharge_info(url, request): @@ -68,9 +68,9 @@ class TestKeyRing(unittest.TestCase): } } - expectInfo = macaroonbakery.ThirdPartyInfo( + expectInfo = bakery.ThirdPartyInfo( public_key=key.public_key, - version=macaroonbakery.BAKERY_V1 + version=bakery.VERSION_1 ) kr = httpbakery.ThirdPartyLocator(allow_insecure=True) with HTTMock(discharge_info): @@ -79,11 +79,11 @@ class TestKeyRing(unittest.TestCase): def test_allow_insecure(self): kr = httpbakery.ThirdPartyLocator() - with self.assertRaises(macaroonbakery.error.ThirdPartyInfoNotFound): + with self.assertRaises(bakery.error.ThirdPartyInfoNotFound): kr.third_party_info('http://0.1.2.3/') def test_fallback(self): - key = macaroonbakery.generate_key() + key = bakery.generate_key() @urlmatch(path='.*/discharge/info') def discharge_info(url, request): @@ -100,9 +100,9 @@ class TestKeyRing(unittest.TestCase): } } - expectInfo = macaroonbakery.ThirdPartyInfo( + expectInfo = bakery.ThirdPartyInfo( public_key=key.public_key, - version=macaroonbakery.BAKERY_V1 + version=bakery.VERSION_1 ) kr = httpbakery.ThirdPartyLocator(allow_insecure=True) with HTTMock(discharge_info): diff --git a/macaroonbakery/tests/test_macaroon.py b/macaroonbakery/tests/test_macaroon.py index 7e77e2b..93bbbb8 100644 --- a/macaroonbakery/tests/test_macaroon.py +++ b/macaroonbakery/tests/test_macaroon.py @@ -7,36 +7,35 @@ import six import pymacaroons from pymacaroons import serializers -import macaroonbakery +import macaroonbakery as bakery import macaroonbakery.checkers as checkers from macaroonbakery.tests import common class TestMacaroon(TestCase): def test_new_macaroon(self): - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( b'rootkey', b'some id', 'here', - macaroonbakery.LATEST_BAKERY_VERSION) + bakery.LATEST_VERSION) self.assertIsNotNone(m) self.assertEquals(m._macaroon.identifier, b'some id') self.assertEquals(m._macaroon.location, 'here') - self.assertEquals(m.version, macaroonbakery.LATEST_BAKERY_VERSION) + self.assertEquals(m.version, bakery.LATEST_VERSION) def test_add_first_party_caveat(self): - m = macaroonbakery.Macaroon('rootkey', 'some id', 'here', - macaroonbakery.LATEST_BAKERY_VERSION) + m = bakery.Macaroon('rootkey', 'some id', 'here', bakery.LATEST_VERSION) m.add_caveat(checkers.Caveat('test_condition')) caveats = m.first_party_caveats() self.assertEquals(len(caveats), 1) self.assertEquals(caveats[0].caveat_id, b'test_condition') def test_add_third_party_caveat(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) - lbv = six.int2byte(macaroonbakery.LATEST_BAKERY_VERSION) + lbv = six.int2byte(bakery.LATEST_VERSION) tests = [ ('no existing id', b'', [], lbv + six.int2byte(0)), ('several existing ids', b'', [ @@ -53,10 +52,10 @@ class TestMacaroon(TestCase): for test in tests: print('test ', test[0]) - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key=b'root key', id=b'id', location='location', - version=macaroonbakery.LATEST_BAKERY_VERSION) + version=bakery.LATEST_VERSION) for id in test[2]: m.macaroon.add_third_party_caveat(key=None, key_id=id, location='') @@ -68,21 +67,21 @@ class TestMacaroon(TestCase): test[3]) def test_marshal_json_latest_version(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) ns = checkers.Namespace({ 'testns': 'x', 'otherns': 'y', }) - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key=b'root key', id=b'id', location='location', - version=macaroonbakery.LATEST_BAKERY_VERSION, + version=bakery.LATEST_VERSION, namespace=ns) m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'), bs.oven.key, locator) data = m.serialize_json() - m1 = macaroonbakery.Macaroon.deserialize_json(data) + m1 = bakery.Macaroon.deserialize_json(data) # Just check the signature and version - we're not interested in fully # checking the macaroon marshaling here. self.assertEqual(m1.macaroon.signature, m.macaroon.signature) @@ -92,8 +91,8 @@ class TestMacaroon(TestCase): self.assertEqual(m1._caveat_data, m._caveat_data) # test with the encoder, decoder - data = json.dumps(m, cls=macaroonbakery.MacaroonJSONEncoder) - m1 = json.loads(data, cls=macaroonbakery.MacaroonJSONDecoder) + data = json.dumps(m, cls=bakery.MacaroonJSONEncoder) + m1 = json.loads(data, cls=bakery.MacaroonJSONDecoder) self.assertEqual(m1.macaroon.signature, m.macaroon.signature) self.assertEqual(m1.macaroon.version, m.macaroon.version) self.assertEqual(len(m1.macaroon.caveats), 1) @@ -101,20 +100,20 @@ class TestMacaroon(TestCase): self.assertEqual(m1._caveat_data, m._caveat_data) def test_json_version1(self): - self._test_json_with_version(macaroonbakery.BAKERY_V1) + self._test_json_with_version(bakery.VERSION_1) def test_json_version2(self): - self._test_json_with_version(macaroonbakery.BAKERY_V2) + self._test_json_with_version(bakery.VERSION_2) def _test_json_with_version(self, version): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery('bs-loc', locator) ns = checkers.Namespace({ 'testns': 'x', }) - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key=b'root key', id=b'id', location='location', version=version, namespace=ns) @@ -124,18 +123,18 @@ class TestMacaroon(TestCase): # Sanity check that no external caveat data has been added. self.assertEqual(len(m._caveat_data), 0) - data = json.dumps(m, cls=macaroonbakery.MacaroonJSONEncoder) - m1 = json.loads(data, cls=macaroonbakery.MacaroonJSONDecoder) + data = json.dumps(m, cls=bakery.MacaroonJSONEncoder) + m1 = json.loads(data, cls=bakery.MacaroonJSONDecoder) # Just check the signature and version - we're not interested in fully # checking the macaroon marshaling here. self.assertEqual(m1.macaroon.signature, m.macaroon.signature) self.assertEqual(m1.macaroon.version, - macaroonbakery.macaroon_version(version)) + bakery.macaroon_version(version)) self.assertEqual(len(m1.macaroon.caveats), 1) # Namespace information has been thrown away. - self.assertEqual(m1.namespace, macaroonbakery.legacy_namespace()) + self.assertEqual(m1.namespace, bakery.legacy_namespace()) self.assertEqual(len(m1._caveat_data), 0) @@ -144,9 +143,9 @@ class TestMacaroon(TestCase): with self.assertRaises(ValueError) as exc: json.loads(json.dumps({ 'm': m.serialize(serializer=serializers.JsonSerializer()), - 'v': macaroonbakery.LATEST_BAKERY_VERSION + 1 - }), cls=macaroonbakery.MacaroonJSONDecoder) - self.assertEqual('unknow bakery version 4', exc.exception.args[0]) + 'v': bakery.LATEST_VERSION + 1 + }), cls=bakery.MacaroonJSONDecoder) + self.assertEqual('unknown bakery version 4', exc.exception.args[0]) def test_json_inconsistent_version(self): m = pymacaroons.Macaroon(version=pymacaroons.MACAROON_V1) @@ -154,21 +153,21 @@ class TestMacaroon(TestCase): json.loads(json.dumps({ 'm': json.loads(m.serialize( serializer=serializers.JsonSerializer())), - 'v': macaroonbakery.LATEST_BAKERY_VERSION - }), cls=macaroonbakery.MacaroonJSONDecoder) + 'v': bakery.LATEST_VERSION + }), cls=bakery.MacaroonJSONDecoder) self.assertEqual('underlying macaroon has inconsistent version; ' 'got 1 want 2', exc.exception.args[0]) def test_clone(self): - locator = macaroonbakery.ThirdPartyStore() + locator = bakery.ThirdPartyStore() bs = common.new_bakery("bs-loc", locator) ns = checkers.Namespace({ "testns": "x", }) - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key=b'root key', id=b'id', location='location', - version=macaroonbakery.LATEST_BAKERY_VERSION, + version=bakery.LATEST_VERSION, namespace=ns) m.add_caveat(checkers.Caveat(location='bs-loc', condition='something'), bs.oven.key, locator) @@ -185,15 +184,15 @@ class TestMacaroon(TestCase): def test_json_deserialize_from_go(self): ns = checkers.Namespace() ns.register("someuri", "x") - m = macaroonbakery.Macaroon( + m = bakery.Macaroon( root_key=b'rootkey', id=b'some id', location='here', - version=macaroonbakery.LATEST_BAKERY_VERSION, namespace=ns) + version=bakery.LATEST_VERSION, namespace=ns) m.add_caveat(checkers.Caveat(condition='something', namespace='someuri')) data = '{"m":{"c":[{"i":"x:something"}],"l":"here","i":"some id",' \ '"s64":"c8edRIupArSrY-WZfa62pgZFD8VjDgqho9U2PlADe-E"},"v":3,' \ '"ns":"someuri:x"}' - m_go = macaroonbakery.Macaroon.deserialize_json(data) + m_go = bakery.Macaroon.deserialize_json(data) self.assertEqual(m.macaroon.signature_bytes, m_go.macaroon.signature_bytes) diff --git a/macaroonbakery/tests/test_namespace.py b/macaroonbakery/tests/test_namespace.py index 2f04bb3..8a821e5 100644 --- a/macaroonbakery/tests/test_namespace.py +++ b/macaroonbakery/tests/test_namespace.py @@ -31,6 +31,8 @@ class TestNamespace(TestCase): ns1 = checkers.deserialize_namespace(data) self.assertEquals(ns1, ns) + # TODO(rogpeppe) add resolve tests + def test_register(self): ns = checkers.Namespace(None) ns.register('testns', 't') diff --git a/macaroonbakery/tests/test_oven.py b/macaroonbakery/tests/test_oven.py index 2976e94..ae235de 100644 --- a/macaroonbakery/tests/test_oven.py +++ b/macaroonbakery/tests/test_oven.py @@ -5,7 +5,7 @@ from unittest import TestCase import copy from datetime import datetime, timedelta -import macaroonbakery +import macaroonbakery as bakery EPOCH = datetime(1900, 11, 17, 19, 00, 13, 0, None) AGES = EPOCH + timedelta(days=10) @@ -15,111 +15,110 @@ class TestOven(TestCase): def test_canonical_ops(self): canonical_ops_tests = ( ('empty array', [], []), - ('one element', [macaroonbakery.Op('a', 'a')], - [macaroonbakery.Op('a', 'a')]), + ('one element', [bakery.Op('a', 'a')], + [bakery.Op('a', 'a')]), ('all in order', - [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'), - macaroonbakery.Op('c', 'c')], - [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'), - macaroonbakery.Op('c', 'c')]), + [bakery.Op('a', 'a'), bakery.Op('a', 'b'), + bakery.Op('c', 'c')], + [bakery.Op('a', 'a'), bakery.Op('a', 'b'), + bakery.Op('c', 'c')]), ('out of order', - [macaroonbakery.Op('c', 'c'), macaroonbakery.Op('a', 'b'), - macaroonbakery.Op('a', 'a')], - [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'), - macaroonbakery.Op('c', 'c')]), + [bakery.Op('c', 'c'), bakery.Op('a', 'b'), + bakery.Op('a', 'a')], + [bakery.Op('a', 'a'), bakery.Op('a', 'b'), + bakery.Op('c', 'c')]), ('with duplicates', - [macaroonbakery.Op('c', 'c'), macaroonbakery.Op('a', 'b'), - macaroonbakery.Op('a', 'a'), macaroonbakery.Op('c', 'a'), - macaroonbakery.Op('c', 'b'), macaroonbakery.Op('c', 'c'), - macaroonbakery.Op('a', 'a')], - [macaroonbakery.Op('a', 'a'), macaroonbakery.Op('a', 'b'), - macaroonbakery.Op('c', 'a'), macaroonbakery.Op('c', 'b'), - macaroonbakery.Op('c', 'c')]), + [bakery.Op('c', 'c'), bakery.Op('a', 'b'), + bakery.Op('a', 'a'), bakery.Op('c', 'a'), + bakery.Op('c', 'b'), bakery.Op('c', 'c'), + bakery.Op('a', 'a')], + [bakery.Op('a', 'a'), bakery.Op('a', 'b'), + bakery.Op('c', 'a'), bakery.Op('c', 'b'), + bakery.Op('c', 'c')]), ('make sure we\'ve got the fields right', - [macaroonbakery.Op(entity='read', action='two'), - macaroonbakery.Op(entity='read', action='one'), - macaroonbakery.Op(entity='write', action='one')], - [macaroonbakery.Op(entity='read', action='one'), - macaroonbakery.Op(entity='read', action='two'), - macaroonbakery.Op(entity='write', action='one')]) + [bakery.Op(entity='read', action='two'), + bakery.Op(entity='read', action='one'), + bakery.Op(entity='write', action='one')], + [bakery.Op(entity='read', action='one'), + bakery.Op(entity='read', action='two'), + bakery.Op(entity='write', action='one')]) ) for about, ops, expected in canonical_ops_tests: new_ops = copy.copy(ops) - canonical_ops = macaroonbakery.canonical_ops(new_ops) + canonical_ops = bakery.canonical_ops(new_ops) self.assertEquals(canonical_ops, expected) # Verify that the original array isn't changed. self.assertEquals(new_ops, ops) def test_multiple_ops(self): - test_oven = macaroonbakery.Oven( - ops_store=macaroonbakery.MemoryOpsStore()) - ops = [macaroonbakery.Op('one', 'read'), - macaroonbakery.Op('one', 'write'), - macaroonbakery.Op('two', 'read')] - m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, + test_oven = bakery.Oven( + ops_store=bakery.MemoryOpsStore()) + ops = [bakery.Op('one', 'read'), + bakery.Op('one', 'write'), + bakery.Op('two', 'read')] + m = test_oven.macaroon(bakery.LATEST_VERSION, AGES, None, ops) got_ops, conds = test_oven.macaroon_ops([m.macaroon]) self.assertEquals(len(conds), 1) # time-before caveat. - self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops) + self.assertEquals(bakery.canonical_ops(got_ops), ops) def test_multiple_ops_in_id(self): - test_oven = macaroonbakery.Oven() - ops = [macaroonbakery.Op('one', 'read'), - macaroonbakery.Op('one', 'write'), - macaroonbakery.Op('two', 'read')] - m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, + test_oven = bakery.Oven() + ops = [bakery.Op('one', 'read'), + bakery.Op('one', 'write'), + bakery.Op('two', 'read')] + m = test_oven.macaroon(bakery.LATEST_VERSION, AGES, None, ops) got_ops, conds = test_oven.macaroon_ops([m.macaroon]) self.assertEquals(len(conds), 1) # time-before caveat. - self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops) + self.assertEquals(bakery.canonical_ops(got_ops), ops) def test_multiple_ops_in_id_with_version1(self): - test_oven = macaroonbakery.Oven() - ops = [macaroonbakery.Op('one', 'read'), - macaroonbakery.Op('one', 'write'), - macaroonbakery.Op('two', 'read')] - m = test_oven.macaroon(macaroonbakery.BAKERY_V1, AGES, None, ops) + test_oven = bakery.Oven() + ops = [bakery.Op('one', 'read'), + bakery.Op('one', 'write'), + bakery.Op('two', 'read')] + m = test_oven.macaroon(bakery.VERSION_1, AGES, None, ops) got_ops, conds = test_oven.macaroon_ops([m.macaroon]) self.assertEquals(len(conds), 1) # time-before caveat. - self.assertEquals(macaroonbakery.canonical_ops(got_ops), ops) + self.assertEquals(bakery.canonical_ops(got_ops), ops) def test_huge_number_of_ops_gives_small_macaroon(self): - test_oven = macaroonbakery.Oven( - ops_store=macaroonbakery.MemoryOpsStore()) + test_oven = bakery.Oven( + ops_store=bakery.MemoryOpsStore()) ops = [] for i in range(30000): - ops.append(macaroonbakery.Op(entity='entity{}'.format(i), - action='action{}'.format(i))) + ops.append(bakery.Op(entity='entity' + str(i), action='action' + str(i))) - m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, + m = test_oven.macaroon(bakery.LATEST_VERSION, AGES, None, ops) got_ops, conds = test_oven.macaroon_ops([m.macaroon]) self.assertEquals(len(conds), 1) # time-before caveat. - self.assertEquals(macaroonbakery.canonical_ops(got_ops), - macaroonbakery.canonical_ops(ops)) + self.assertEquals(bakery.canonical_ops(got_ops), + bakery.canonical_ops(ops)) data = m.serialize_json() self.assertLess(len(data), 300) def test_ops_stored_only_once(self): - st = macaroonbakery.MemoryOpsStore() - test_oven = macaroonbakery.Oven(ops_store=st) + st = bakery.MemoryOpsStore() + test_oven = bakery.Oven(ops_store=st) - ops = [macaroonbakery.Op('one', 'read'), - macaroonbakery.Op('one', 'write'), - macaroonbakery.Op('two', 'read')] + ops = [bakery.Op('one', 'read'), + bakery.Op('one', 'write'), + bakery.Op('two', 'read')] - m = test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, + m = test_oven.macaroon(bakery.LATEST_VERSION, AGES, None, ops) got_ops, conds = test_oven.macaroon_ops([m.macaroon]) - self.assertEquals(macaroonbakery.canonical_ops(got_ops), - macaroonbakery.canonical_ops(ops)) + self.assertEquals(bakery.canonical_ops(got_ops), + bakery.canonical_ops(ops)) # Make another macaroon containing the same ops in a different order. - ops = [macaroonbakery.Op('one', 'write'), - macaroonbakery.Op('one', 'read'), - macaroonbakery.Op('one', 'read'), - macaroonbakery.Op('two', 'read')] - test_oven.macaroon(macaroonbakery.LATEST_BAKERY_VERSION, AGES, None, + ops = [bakery.Op('one', 'write'), + bakery.Op('one', 'read'), + bakery.Op('one', 'read'), + bakery.Op('two', 'read')] + test_oven.macaroon(bakery.LATEST_VERSION, AGES, None, ops) self.assertEquals(len(st._store), 1) diff --git a/macaroonbakery/tests/test_store.py b/macaroonbakery/tests/test_store.py index 7bcc4c2..5afa7be 100644 --- a/macaroonbakery/tests/test_store.py +++ b/macaroonbakery/tests/test_store.py @@ -2,12 +2,12 @@ # Licensed under the LGPLv3, see LICENCE file for details. from unittest import TestCase -import macaroonbakery +import macaroonbakery as bakery class TestOven(TestCase): def test_mem_store(self): - st = macaroonbakery.MemoryKeyStore() + st = bakery.MemoryKeyStore() key, id = st.root_key() self.assertEqual(len(key), 24) diff --git a/macaroonbakery/tests/test_time.py b/macaroonbakery/tests/test_time.py new file mode 100644 index 0000000..38826e1 --- /dev/null +++ b/macaroonbakery/tests/test_time.py @@ -0,0 +1,129 @@ +# Copyright 2017 Canonical Ltd. +# Licensed under the LGPLv3, see LICENCE file for details. +from datetime import timedelta +from unittest import TestCase +from collections import namedtuple + +import pyrfc3339 +import pymacaroons +from pymacaroons import Macaroon + +import macaroonbakery.checkers as checkers + +t1 = pyrfc3339.parse('2017-10-26T16:19:47.441402074Z') +t2 = t1 + timedelta(hours=1) +t3 = t2 + timedelta(hours=1) + + +def fpcaveat(s): + return pymacaroons.Caveat(caveat_id=s.encode('utf-8')) + + +class TestExpireTime(TestCase): + def test_expire_time(self): + ExpireTest = namedtuple('ExpireTest', 'about caveats expectTime') + tests = [ + ExpireTest( + about='no caveats', + caveats=[], + expectTime=None, + ), + ExpireTest( + about='single time-before caveat', + caveats=[ + fpcaveat(checkers.time_before_caveat(t1).condition), + ], + expectTime=t1, + ), + ExpireTest( + about='multiple time-before caveat', + caveats=[ + fpcaveat(checkers.time_before_caveat(t2).condition), + fpcaveat(checkers.time_before_caveat(t1).condition), + ], + expectTime=t1, + ), + ExpireTest( + about='mixed caveats', + caveats=[ + fpcaveat(checkers.time_before_caveat(t1).condition), + fpcaveat('allow bar'), + fpcaveat(checkers.time_before_caveat(t2).condition), + fpcaveat('deny foo'), + ], + expectTime=t1, + ), + ExpireTest( + about='mixed caveats', + caveats=[ + fpcaveat(checkers.COND_TIME_BEFORE + ' tomorrow'), + ], + expectTime=None, + ), + ] + for test in tests: + print('test ', test.about) + t = checkers.expiry_time(checkers.Namespace(), test.caveats) + self.assertEqual(t, test.expectTime) + + def test_macaroons_expire_time(self): + ExpireTest = namedtuple('ExpireTest', 'about macaroons expectTime') + tests = [ + ExpireTest( + about='no macaroons', + macaroons=[newMacaroon()], + expectTime=None, + ), + ExpireTest( + about='single macaroon without caveats', + macaroons=[newMacaroon()], + expectTime=None, + ), + ExpireTest( + about='multiple macaroon without caveats', + macaroons=[newMacaroon()], + expectTime=None, + ), + ExpireTest( + about='single macaroon with time-before caveat', + macaroons=[ + newMacaroon([checkers.time_before_caveat(t1).condition]), + ], + expectTime=t1, + ), + ExpireTest( + about='single macaroon with multiple time-before caveats', + macaroons=[ + newMacaroon([ + checkers.time_before_caveat(t2).condition, + checkers.time_before_caveat(t1).condition, + ]), + ], + expectTime=t1, + ), + ExpireTest( + about='multiple macaroons with multiple time-before caveats', + macaroons=[ + newMacaroon([ + checkers.time_before_caveat(t3).condition, + checkers.time_before_caveat(t1).condition, + ]), + newMacaroon([ + checkers.time_before_caveat(t3).condition, + checkers.time_before_caveat(t1).condition, + ]), + ], + expectTime=t1, + ), + ] + for test in tests: + print('test ', test.about) + t = checkers.macaroons_expiry_time(checkers.Namespace(), test.macaroons) + self.assertEqual(t, test.expectTime) + + +def newMacaroon(conds=[]): + m = Macaroon(key='key', version=2) + for cond in conds: + m.add_first_party_caveat(cond) + return m |