summaryrefslogtreecommitdiff
path: root/macaroonbakery/tests/test_agent.py
diff options
context:
space:
mode:
authorColin Watson <cjwatson@debian.org>2017-11-06 10:04:48 +0000
committerColin Watson <cjwatson@debian.org>2017-11-06 10:04:48 +0000
commit37d61d0415f6cc96a7a9abe057e1ae0f89fd977e (patch)
tree4ca3c2560d2ba062adb7de86d047d67db8984940 /macaroonbakery/tests/test_agent.py
parent3d9eaeb5dacee168a93da090e2c0d46eedbe51a2 (diff)
Import py-macaroon-bakery_0.0.5.orig.tar.gz
Diffstat (limited to 'macaroonbakery/tests/test_agent.py')
-rw-r--r--macaroonbakery/tests/test_agent.py331
1 files changed, 323 insertions, 8 deletions
diff --git a/macaroonbakery/tests/test_agent.py b/macaroonbakery/tests/test_agent.py
index 49134f5..67f5b84 100644
--- a/macaroonbakery/tests/test_agent.py
+++ b/macaroonbakery/tests/test_agent.py
@@ -1,6 +1,7 @@
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
import base64
+from datetime import datetime, timedelta
import json
import os
import tempfile
@@ -9,7 +10,17 @@ from unittest import TestCase
import nacl.encoding
import requests.cookies
import six
+from six.moves.urllib.parse import parse_qs
+from six.moves.http_cookies import SimpleCookie
+from httmock import (
+ HTTMock,
+ urlmatch,
+ response
+)
+import macaroonbakery as bakery
+import macaroonbakery.httpbakery as httpbakery
+import macaroonbakery.checkers as checkers
import macaroonbakery.httpbakery.agent as agent
@@ -62,13 +73,19 @@ class TestAgents(TestCase):
def test_load_agents_into_cookies(self):
cookies = requests.cookies.RequestsCookieJar()
- c1, key = agent.load_agent_file(self.agent_filename, cookies=cookies)
+ c1, key = agent.load_agent_file(
+ self.agent_filename,
+ cookies=cookies,
+ )
self.assertEqual(c1, cookies)
- self.assertEqual(key.encode(nacl.encoding.Base64Encoder),
- b'CqoSgj06Zcgb4/S6RT4DpTjLAfKoznEY3JsShSjKJEU=')
+ self.assertEqual(
+ key.encode(nacl.encoding.Base64Encoder),
+ b'CqoSgj06Zcgb4/S6RT4DpTjLAfKoznEY3JsShSjKJEU=',
+ )
self.assertEqual(
key.public_key.encode(nacl.encoding.Base64Encoder),
- b'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
+ b'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=',
+ )
value = cookies.get('agent-login', domain='1.example.com')
jv = base64.b64decode(value)
@@ -76,8 +93,7 @@ class TestAgents(TestCase):
jv = jv.decode('utf-8')
data = json.loads(jv)
self.assertEqual(data['username'], 'user-1')
- self.assertEqual(data['public_key'],
- 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
+ self.assertEqual(data['public_key'], 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
value = cookies.get('agent-login', domain='2.example.com',
path='/discharger')
@@ -86,8 +102,7 @@ class TestAgents(TestCase):
jv = jv.decode('utf-8')
data = json.loads(jv)
self.assertEqual(data['username'], 'user-2')
- self.assertEqual(data['public_key'],
- 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
+ self.assertEqual(data['public_key'], 'YAhRSsth3a36mRYqQGQaLiS4QJax0p356nd+B8x7UQE=')
def test_load_agents_with_bad_key(self):
with self.assertRaises(agent.AgentFileFormatError):
@@ -97,6 +112,295 @@ class TestAgents(TestCase):
with self.assertRaises(agent.AgentFileFormatError):
agent.load_agent_file(self.no_username_agent_filename)
+ def test_agent_login(self):
+ discharge_key = bakery.generate_key()
+
+ class _DischargerLocator(bakery.ThirdPartyLocator):
+ def third_party_info(self, loc):
+ if loc == 'http://0.3.2.1':
+ return bakery.ThirdPartyInfo(
+ public_key=discharge_key.public_key,
+ version=bakery.LATEST_VERSION,
+ )
+ d = _DischargerLocator()
+ server_key = bakery.generate_key()
+ server_bakery = bakery.Bakery(key=server_key, locator=d)
+
+ @urlmatch(path='.*/here')
+ def server_get(url, request):
+ ctx = checkers.AuthContext()
+ test_ops = [bakery.Op(entity='test-op', action='read')]
+ auth_checker = server_bakery.checker.auth(
+ httpbakery.extract_macaroons(request.headers))
+ try:
+ auth_checker.allow(ctx, test_ops)
+ resp = response(status_code=200,
+ content='done')
+ except bakery.PermissionDenied:
+ caveats = [
+ checkers.Caveat(location='http://0.3.2.1', condition='is-ok')
+ ]
+ m = server_bakery.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=datetime.utcnow() + timedelta(days=1),
+ caveats=caveats, ops=test_ops)
+ content, headers = httpbakery.discharge_required_response(
+ m, '/',
+ 'test',
+ 'message')
+ resp = response(status_code=401,
+ content=content,
+ headers=headers)
+ return request.hooks['response'][0](resp)
+
+ @urlmatch(path='.*/discharge')
+ def discharge(url, request):
+ qs = parse_qs(request.body)
+ if qs.get('token64') is None:
+ return response(
+ status_code=401,
+ content={
+ 'Code': httpbakery.ERR_INTERACTION_REQUIRED,
+ 'Message': 'interaction required',
+ 'Info': {
+ 'InteractionMethods': {
+ 'agent': {'login-url': '/login'},
+ },
+ },
+ },
+ headers={'Content-Type': 'application/json'})
+ else:
+ qs = parse_qs(request.body)
+ content = {q: qs[q][0] for q in qs}
+ m = httpbakery.discharge(checkers.AuthContext(), content,
+ discharge_key, None, alwaysOK3rd)
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Macaroon': m.serialize_json()
+ }
+ }
+
+ key = bakery.generate_key()
+
+ @urlmatch(path='.*/login')
+ def login(url, request):
+ b = bakery.Bakery(key=discharge_key)
+ m = b.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=datetime.utcnow() + timedelta(days=1),
+ caveats=[bakery.local_third_party_caveat(
+ key.public_key,
+ version=httpbakery.request_version(request.headers))],
+ ops=[bakery.Op(entity='agent', action='login')])
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'macaroon': m.to_dict()
+ }
+ }
+
+ with HTTMock(server_get), \
+ HTTMock(discharge), \
+ HTTMock(login):
+ client = httpbakery.Client(interaction_methods=[
+ agent.AgentInteractor(
+ agent.AuthInfo(
+ key=key,
+ agents=[
+ agent.Agent(
+ username='test-user',
+ url=u'http://0.3.2.1'
+ )
+ ],
+ ),
+ ),
+ ])
+ resp = requests.get(
+ 'http://0.1.2.3/here',
+ cookies=client.cookies,
+ auth=client.auth())
+ self.assertEquals(resp.content, b'done')
+
+ def test_agent_legacy(self):
+ discharge_key = bakery.generate_key()
+
+ class _DischargerLocator(bakery.ThirdPartyLocator):
+ def third_party_info(self, loc):
+ if loc == 'http://0.3.2.1':
+ return bakery.ThirdPartyInfo(
+ public_key=discharge_key.public_key,
+ version=bakery.LATEST_VERSION,
+ )
+ d = _DischargerLocator()
+ server_key = bakery.generate_key()
+ server_bakery = bakery.Bakery(key=server_key, locator=d)
+
+ @urlmatch(path='.*/here')
+ def server_get(url, request):
+ ctx = checkers.AuthContext()
+ test_ops = [bakery.Op(entity='test-op', action='read')]
+ auth_checker = server_bakery.checker.auth(
+ httpbakery.extract_macaroons(request.headers))
+ try:
+ auth_checker.allow(ctx, test_ops)
+ resp = response(status_code=200,
+ content='done')
+ except bakery.PermissionDenied:
+ caveats = [
+ checkers.Caveat(location='http://0.3.2.1',
+ condition='is-ok')
+ ]
+ m = server_bakery.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=datetime.utcnow() + timedelta(days=1),
+ caveats=caveats, ops=test_ops)
+ content, headers = httpbakery.discharge_required_response(
+ m, '/',
+ 'test',
+ 'message')
+ resp = response(
+ status_code=401,
+ content=content,
+ headers=headers,
+ )
+ return request.hooks['response'][0](resp)
+
+ class InfoStorage:
+ info = None
+
+ @urlmatch(path='.*/discharge')
+ def discharge(url, request):
+ qs = parse_qs(request.body)
+ if qs.get('caveat64') is not None:
+ content = {q: qs[q][0] for q in qs}
+
+ class InteractionRequiredError(Exception):
+ def __init__(self, error):
+ self.error = error
+
+ class CheckerInError(bakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(self, ctx, info):
+ InfoStorage.info = info
+ raise InteractionRequiredError(
+ httpbakery.Error(
+ code=httpbakery.ERR_INTERACTION_REQUIRED,
+ version=httpbakery.request_version(
+ request.headers),
+ message='interaction required',
+ info=httpbakery.ErrorInfo(
+ wait_url='http://0.3.2.1/wait?'
+ 'dischargeid=1',
+ visit_url='http://0.3.2.1/visit?'
+ 'dischargeid=1'
+ ),
+ ),
+ )
+ try:
+ httpbakery.discharge(
+ checkers.AuthContext(), content,
+ discharge_key, None, CheckerInError())
+ except InteractionRequiredError as exc:
+ return response(
+ status_code=401,
+ content={
+ 'Code': exc.error.code,
+ 'Message': exc.error.message,
+ 'Info': {
+ 'WaitURL': exc.error.info.wait_url,
+ 'VisitURL': exc.error.info.visit_url,
+ },
+ },
+ headers={'Content-Type': 'application/json'})
+
+ key = bakery.generate_key()
+
+ @urlmatch(path='.*/visit?$')
+ def visit(url, request):
+ if request.headers.get('Accept') == 'application/json':
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'agent': request.url
+ }
+ }
+ cs = SimpleCookie()
+ cookies = request.headers.get('Cookie')
+ if cookies is not None:
+ cs.load(str(cookies))
+ public_key = None
+ for c in cs:
+ if c == 'agent-login':
+ json_cookie = json.loads(
+ base64.b64decode(cs[c].value).decode('utf-8'))
+ public_key = bakery.PublicKey.deserialize(json_cookie.get('public_key'))
+ ms = httpbakery.extract_macaroons(request.headers)
+ if len(ms) == 0:
+ b = bakery.Bakery(key=discharge_key)
+ m = b.oven.macaroon(
+ version=bakery.LATEST_VERSION,
+ expiry=datetime.utcnow() + timedelta(days=1),
+ caveats=[bakery.local_third_party_caveat(
+ public_key,
+ version=httpbakery.request_version(request.headers))],
+ ops=[bakery.Op(entity='agent', action='login')])
+ content, headers = httpbakery.discharge_required_response(
+ m, '/',
+ 'test',
+ 'message')
+ resp = response(status_code=401,
+ content=content,
+ headers=headers)
+ return request.hooks['response'][0](resp)
+
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'agent-login': True
+ }
+ }
+
+ @urlmatch(path='.*/wait?$')
+ def wait(url, request):
+ class EmptyChecker(bakery.ThirdPartyCaveatChecker):
+ def check_third_party_caveat(self, ctx, info):
+ return []
+ if InfoStorage.info is None:
+ self.fail('visit url has not been visited')
+ m = bakery.discharge(
+ checkers.AuthContext(),
+ InfoStorage.info.id,
+ InfoStorage.info.caveat,
+ discharge_key,
+ EmptyChecker(),
+ _DischargerLocator(),
+ )
+ return {
+ 'status_code': 200,
+ 'content': {
+ 'Macaroon': m.to_dict()
+ }
+ }
+
+ with HTTMock(server_get), \
+ HTTMock(discharge), \
+ HTTMock(visit), \
+ HTTMock(wait):
+ client = httpbakery.Client(interaction_methods=[
+ agent.AgentInteractor(
+ agent.AuthInfo(
+ key=key,
+ agents=[agent.Agent(username='test-user', url=u'http://0.3.2.1')],
+ ),
+ ),
+ ])
+ resp = requests.get(
+ 'http://0.1.2.3/here',
+ cookies=client.cookies,
+ auth=client.auth(),
+ )
+ self.assertEquals(resp.content, b'done')
+
agent_file = '''
{
@@ -146,3 +450,14 @@ no_username_agent_file = '''
}]
}
'''
+
+
+class ThirdPartyCaveatCheckerF(bakery.ThirdPartyCaveatChecker):
+ def __init__(self, check):
+ self._check = check
+
+ def check_third_party_caveat(self, ctx, info):
+ cond, arg = checkers.parse_caveat(info.condition)
+ return self._check(cond, arg)
+
+alwaysOK3rd = ThirdPartyCaveatCheckerF(lambda cond, arg: [])