diff options
author | Colin Watson <cjwatson@debian.org> | 2017-11-06 10:04:48 +0000 |
---|---|---|
committer | Colin Watson <cjwatson@debian.org> | 2017-11-06 10:04:48 +0000 |
commit | 37d61d0415f6cc96a7a9abe057e1ae0f89fd977e (patch) | |
tree | 4ca3c2560d2ba062adb7de86d047d67db8984940 /macaroonbakery/tests/test_agent.py | |
parent | 3d9eaeb5dacee168a93da090e2c0d46eedbe51a2 (diff) |
Import py-macaroon-bakery_0.0.5.orig.tar.gz
Diffstat (limited to 'macaroonbakery/tests/test_agent.py')
-rw-r--r-- | macaroonbakery/tests/test_agent.py | 331 |
1 files changed, 323 insertions, 8 deletions
diff --git a/macaroonbakery/tests/test_agent.py b/macaroonbakery/tests/test_agent.py index 49134f5..67f5b84 100644 --- a/macaroonbakery/tests/test_agent.py +++ b/macaroonbakery/tests/test_agent.py @@ -1,6 +1,7 @@ # Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. import base64 +from datetime import datetime, timedelta import json import os import tempfile @@ -9,7 +10,17 @@ from unittest import TestCase import nacl.encoding import requests.cookies import six +from six.moves.urllib.parse import parse_qs +from six.moves.http_cookies import SimpleCookie +from httmock import ( + HTTMock, + urlmatch, + response +) +import macaroonbakery as bakery +import macaroonbakery.httpbakery as httpbakery +import macaroonbakery.checkers as checkers import macaroonbakery.httpbakery.agent as agent @@ -62,13 +73,19 @@ class TestAgents(TestCase): def test_load_agents_into_cookies(self): cookies = requests.cookies.RequestsCookieJar() - c1, key = agent.load_agent_file(self.agent_filename, cookies=cookies) + c1, key = agent.load_agent_file( + self.agent_filename, + cookies=cookies, + ) self.assertEqual(c1, cookies) - self.assertEqual(key.encode(nacl.encoding.Base64Encoder), - b'CqoSgj06Zcgb4/S6RT4DpTjLAfKoznEY3JsShSjKJEU=') + self.assertEqual( + key.encode(nacl.encoding.Base64Encoder), + b'CqoSgj06Zcgb4/S6RT4DpTjLAfKoznEY3JsShSjKJEU=', + ) self.assertEqual( key.public_key.encode(nacl.encoding.Base64Encoder), - b'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') + b'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=', + ) value = cookies.get('agent-login', domain='1.example.com') jv = base64.b64decode(value) @@ -76,8 +93,7 @@ class TestAgents(TestCase): jv = jv.decode('utf-8') data = json.loads(jv) self.assertEqual(data['username'], 'user-1') - self.assertEqual(data['public_key'], - 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') + self.assertEqual(data['public_key'], 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') value = cookies.get('agent-login', domain='2.example.com', path='/discharger') @@ -86,8 +102,7 @@ class TestAgents(TestCase): jv = jv.decode('utf-8') data = json.loads(jv) self.assertEqual(data['username'], 'user-2') - self.assertEqual(data['public_key'], - 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') + self.assertEqual(data['public_key'], 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=') def test_load_agents_with_bad_key(self): with self.assertRaises(agent.AgentFileFormatError): @@ -97,6 +112,295 @@ class TestAgents(TestCase): with self.assertRaises(agent.AgentFileFormatError): agent.load_agent_file(self.no_username_agent_filename) + def test_agent_login(self): + discharge_key = bakery.generate_key() + + class _DischargerLocator(bakery.ThirdPartyLocator): + def third_party_info(self, loc): + if loc == 'http://0.3.2.1': + return bakery.ThirdPartyInfo( + public_key=discharge_key.public_key, + version=bakery.LATEST_VERSION, + ) + d = _DischargerLocator() + server_key = bakery.generate_key() + server_bakery = bakery.Bakery(key=server_key, locator=d) + + @urlmatch(path='.*/here') + def server_get(url, request): + ctx = checkers.AuthContext() + test_ops = [bakery.Op(entity='test-op', action='read')] + auth_checker = server_bakery.checker.auth( + httpbakery.extract_macaroons(request.headers)) + try: + auth_checker.allow(ctx, test_ops) + resp = response(status_code=200, + content='done') + except bakery.PermissionDenied: + caveats = [ + checkers.Caveat(location='http://0.3.2.1', condition='is-ok') + ] + m = server_bakery.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=datetime.utcnow() + timedelta(days=1), + caveats=caveats, ops=test_ops) + content, headers = httpbakery.discharge_required_response( + m, '/', + 'test', + 'message') + resp = response(status_code=401, + content=content, + headers=headers) + return request.hooks['response'][0](resp) + + @urlmatch(path='.*/discharge') + def discharge(url, request): + qs = parse_qs(request.body) + if qs.get('token64') is None: + return response( + status_code=401, + content={ + 'Code': httpbakery.ERR_INTERACTION_REQUIRED, + 'Message': 'interaction required', + 'Info': { + 'InteractionMethods': { + 'agent': {'login-url': '/login'}, + }, + }, + }, + headers={'Content-Type': 'application/json'}) + else: + qs = parse_qs(request.body) + content = {q: qs[q][0] for q in qs} + m = httpbakery.discharge(checkers.AuthContext(), content, + discharge_key, None, alwaysOK3rd) + return { + 'status_code': 200, + 'content': { + 'Macaroon': m.serialize_json() + } + } + + key = bakery.generate_key() + + @urlmatch(path='.*/login') + def login(url, request): + b = bakery.Bakery(key=discharge_key) + m = b.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=datetime.utcnow() + timedelta(days=1), + caveats=[bakery.local_third_party_caveat( + key.public_key, + version=httpbakery.request_version(request.headers))], + ops=[bakery.Op(entity='agent', action='login')]) + return { + 'status_code': 200, + 'content': { + 'macaroon': m.to_dict() + } + } + + with HTTMock(server_get), \ + HTTMock(discharge), \ + HTTMock(login): + client = httpbakery.Client(interaction_methods=[ + agent.AgentInteractor( + agent.AuthInfo( + key=key, + agents=[ + agent.Agent( + username='test-user', + url=u'http://0.3.2.1' + ) + ], + ), + ), + ]) + resp = requests.get( + 'http://0.1.2.3/here', + cookies=client.cookies, + auth=client.auth()) + self.assertEquals(resp.content, b'done') + + def test_agent_legacy(self): + discharge_key = bakery.generate_key() + + class _DischargerLocator(bakery.ThirdPartyLocator): + def third_party_info(self, loc): + if loc == 'http://0.3.2.1': + return bakery.ThirdPartyInfo( + public_key=discharge_key.public_key, + version=bakery.LATEST_VERSION, + ) + d = _DischargerLocator() + server_key = bakery.generate_key() + server_bakery = bakery.Bakery(key=server_key, locator=d) + + @urlmatch(path='.*/here') + def server_get(url, request): + ctx = checkers.AuthContext() + test_ops = [bakery.Op(entity='test-op', action='read')] + auth_checker = server_bakery.checker.auth( + httpbakery.extract_macaroons(request.headers)) + try: + auth_checker.allow(ctx, test_ops) + resp = response(status_code=200, + content='done') + except bakery.PermissionDenied: + caveats = [ + checkers.Caveat(location='http://0.3.2.1', + condition='is-ok') + ] + m = server_bakery.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=datetime.utcnow() + timedelta(days=1), + caveats=caveats, ops=test_ops) + content, headers = httpbakery.discharge_required_response( + m, '/', + 'test', + 'message') + resp = response( + status_code=401, + content=content, + headers=headers, + ) + return request.hooks['response'][0](resp) + + class InfoStorage: + info = None + + @urlmatch(path='.*/discharge') + def discharge(url, request): + qs = parse_qs(request.body) + if qs.get('caveat64') is not None: + content = {q: qs[q][0] for q in qs} + + class InteractionRequiredError(Exception): + def __init__(self, error): + self.error = error + + class CheckerInError(bakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(self, ctx, info): + InfoStorage.info = info + raise InteractionRequiredError( + httpbakery.Error( + code=httpbakery.ERR_INTERACTION_REQUIRED, + version=httpbakery.request_version( + request.headers), + message='interaction required', + info=httpbakery.ErrorInfo( + wait_url='http://0.3.2.1/wait?' + 'dischargeid=1', + visit_url='http://0.3.2.1/visit?' + 'dischargeid=1' + ), + ), + ) + try: + httpbakery.discharge( + checkers.AuthContext(), content, + discharge_key, None, CheckerInError()) + except InteractionRequiredError as exc: + return response( + status_code=401, + content={ + 'Code': exc.error.code, + 'Message': exc.error.message, + 'Info': { + 'WaitURL': exc.error.info.wait_url, + 'VisitURL': exc.error.info.visit_url, + }, + }, + headers={'Content-Type': 'application/json'}) + + key = bakery.generate_key() + + @urlmatch(path='.*/visit?$') + def visit(url, request): + if request.headers.get('Accept') == 'application/json': + return { + 'status_code': 200, + 'content': { + 'agent': request.url + } + } + cs = SimpleCookie() + cookies = request.headers.get('Cookie') + if cookies is not None: + cs.load(str(cookies)) + public_key = None + for c in cs: + if c == 'agent-login': + json_cookie = json.loads( + base64.b64decode(cs[c].value).decode('utf-8')) + public_key = bakery.PublicKey.deserialize(json_cookie.get('public_key')) + ms = httpbakery.extract_macaroons(request.headers) + if len(ms) == 0: + b = bakery.Bakery(key=discharge_key) + m = b.oven.macaroon( + version=bakery.LATEST_VERSION, + expiry=datetime.utcnow() + timedelta(days=1), + caveats=[bakery.local_third_party_caveat( + public_key, + version=httpbakery.request_version(request.headers))], + ops=[bakery.Op(entity='agent', action='login')]) + content, headers = httpbakery.discharge_required_response( + m, '/', + 'test', + 'message') + resp = response(status_code=401, + content=content, + headers=headers) + return request.hooks['response'][0](resp) + + return { + 'status_code': 200, + 'content': { + 'agent-login': True + } + } + + @urlmatch(path='.*/wait?$') + def wait(url, request): + class EmptyChecker(bakery.ThirdPartyCaveatChecker): + def check_third_party_caveat(self, ctx, info): + return [] + if InfoStorage.info is None: + self.fail('visit url has not been visited') + m = bakery.discharge( + checkers.AuthContext(), + InfoStorage.info.id, + InfoStorage.info.caveat, + discharge_key, + EmptyChecker(), + _DischargerLocator(), + ) + return { + 'status_code': 200, + 'content': { + 'Macaroon': m.to_dict() + } + } + + with HTTMock(server_get), \ + HTTMock(discharge), \ + HTTMock(visit), \ + HTTMock(wait): + client = httpbakery.Client(interaction_methods=[ + agent.AgentInteractor( + agent.AuthInfo( + key=key, + agents=[agent.Agent(username='test-user', url=u'http://0.3.2.1')], + ), + ), + ]) + resp = requests.get( + 'http://0.1.2.3/here', + cookies=client.cookies, + auth=client.auth(), + ) + self.assertEquals(resp.content, b'done') + agent_file = ''' { @@ -146,3 +450,14 @@ no_username_agent_file = ''' }] } ''' + + +class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker): + def __init__(self, check): + self._check = check + + def check_third_party_caveat(self, ctx, info): + cond, arg = checkers.parse_caveat(info.condition) + return self._check(cond, arg) + +alwaysOK3rd = ThirdPartyCaveatCheckerF(lambda cond, arg: []) |