# Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. from unittest import TestCase import macaroonbakery.httpbakery as httpbakery import requests from mock import patch from httmock import HTTMock, response, urlmatch ID_PATH = 'http://example.com/someprotecteurl' json_macaroon = { u'identifier': u'macaroon-identifier', u'caveats': [ { u'cl': u'http://example.com/identity/v1/discharger', u'vid': u'zgtQa88oS9UF45DlJniRaAUT4qqHhLxQzCeUU9N2O1Uu-' u'yhFulgGbSA0zDGdkrq8YNQAxGiARA_-AGxyoh25kiTycb8u47pD', u'cid': u'eyJUaGlyZFBhcnR5UHV' }, { u'cid': u'allow read-no-terms write' }, { u'cid': u'time-before 2158-07-19T14:29:14.312669464Z' }], u'location': u'charmstore', u'signature': u'52d17cb11f5c84d58441bc0ffd7cc396' u'5115374ce2fa473ecf06265b5d4d9e81' } discharge_token = [{ u'identifier': u'token-identifier===', u'caveats': [{ u'cid': u'declared username someone' }, { u'cid': u'time-before 2158-08-15T15:55:52.428319076Z' }, { u'cid': u'origin ' }], u'location': u'https://example.com/identity', u'signature': u'5ae0e7a2abf806bdd92f510fcd3' u'198f520691259abe76ffae5623dae048769ef' }] discharged_macaroon = { u'identifier': u'discharged-identifier=', u'caveats': [{ u'cid': u'declared uuid a1130b10-3deb-59b7-baf0-c2a3f83e7382' }, { u'cid': u'declared username someone' }, { u'cid': u'time-before 2158-07-19T15:55:52.432439055Z' }], u'location': u'', u'signature': u'3513db5503ab17f9576760cd28' u'ce658ce8bf6b43038255969fc3c1cd8b172345' } @urlmatch(path='.*/someprotecteurl') def first_407_then_200(url, request): if request.headers.get('cookie', '').startswith('macaroon-'): return { 'status_code': 200, 'content': { 'Value': 'some value' } } else: resp = response(status_code=407, content={ 'Info': { 'Macaroon': json_macaroon, 'MacaroonPath': '/', 'CookieNameSuffix': 'test' }, 'Message': 'verification failed: no macaroon ' 'cookies in request', 'Code': 'macaroon discharge required' }, headers={'Content-Type': 'application/json'}) return request.hooks['response'][0](resp) @urlmatch(netloc='example.com:8000', path='.*/someprotecteurl') def first_407_then_200_with_port(url, request): if request.headers.get('cookie', '').startswith('macaroon-'): return { 'status_code': 200, 'content': { 'Value': 'some value' } } else: resp = response(status_code=407, content={ 'Info': { 'Macaroon': json_macaroon, 'MacaroonPath': '/', 'CookieNameSuffix': 'test' }, 'Message': 'verification failed: no macaroon ' 'cookies in request', 'Code': 'macaroon discharge required' }, headers={'Content-Type': 'application/json'}, request=request) return request.hooks['response'][0](resp) @urlmatch(path='.*/someprotecteurl') def valid_200(url, request): return { 'status_code': 200, 'content': { 'Value': 'some value' } } @urlmatch(path='.*/discharge') def discharge_200(url, request): return { 'status_code': 200, 'content': { 'Macaroon': discharged_macaroon } } @urlmatch(path='.*/discharge') def discharge_401(url, request): return { 'status_code': 401, 'content': { 'Code': 'interaction required', 'Info': { 'VisitURL': 'http://example.com/visit', 'WaitURL': 'http://example.com/wait' } }, 'headers': { 'WWW-Authenticate': 'Macaroon' } } @urlmatch(path='.*/wait') def wait_after_401(url, request): if request.url != 'http://example.com/wait': return {'status_code': 500} return { 'status_code': 200, 'content': { 'DischargeToken': discharge_token, 'Macaroon': discharged_macaroon } } @urlmatch(path='.*/wait') def wait_on_error(url, request): return { 'status_code': 500, 'content': { 'DischargeToken': discharge_token, 'Macaroon': discharged_macaroon } } class TestBakery(TestCase): def assert_cookie_security(self, cookies, name, secure): for cookie in cookies: if cookie.name == name: assert cookie.secure == secure break else: assert False, 'no cookie named {} found in jar'.format(name) def test_discharge(self): client = httpbakery.Client() with HTTMock(first_407_then_200), HTTMock(discharge_200): resp = requests.get(ID_PATH, cookies=client.cookies, auth=client.auth()) resp.raise_for_status() assert 'macaroon-test' in client.cookies.keys() self.assert_cookie_security(client.cookies, 'macaroon-test', secure=False) @patch('webbrowser.open') def test_407_then_401_on_discharge(self, mock_open): client = httpbakery.Client() with HTTMock(first_407_then_200), HTTMock(discharge_401), \ HTTMock(wait_after_401): resp = requests.get( ID_PATH, cookies=client.cookies, auth=client.auth(), ) resp.raise_for_status() mock_open.assert_called_once_with(u'http://example.com/visit', new=1) assert 'macaroon-test' in client.cookies.keys() @patch('webbrowser.open') def test_407_then_error_on_wait(self, mock_open): client = httpbakery.Client() with HTTMock(first_407_then_200), HTTMock(discharge_401),\ HTTMock(wait_on_error): with self.assertRaises(httpbakery.InteractionError) as exc: requests.get( ID_PATH, cookies=client.cookies, auth=client.auth(), ) self.assertEqual(str(exc.exception), 'cannot start interactive session: cannot get ' 'http://example.com/wait') mock_open.assert_called_once_with(u'http://example.com/visit', new=1) def test_407_then_no_interaction_methods(self): client = httpbakery.Client(interaction_methods=[]) with HTTMock(first_407_then_200), HTTMock(discharge_401): with self.assertRaises(httpbakery.InteractionError) as exc: requests.get( ID_PATH, cookies=client.cookies, auth=client.auth(), ) self.assertEqual(str(exc.exception), 'cannot start interactive session: interaction ' 'required but not possible') def test_407_then_unknown_interaction_methods(self): class UnknownInteractor(httpbakery.Interactor): def kind(self): return 'unknown' client = httpbakery.Client(interaction_methods=[UnknownInteractor()]) with HTTMock(first_407_then_200), HTTMock(discharge_401): with self.assertRaises(httpbakery.InteractionError) as exc: requests.get( ID_PATH, cookies=client.cookies, auth=client.auth(), ) self.assertEqual( str(exc.exception), 'cannot start interactive session: no methods supported; ' 'supported [unknown]; provided [interactive]' ) def test_cookie_with_port(self): client = httpbakery.Client() with HTTMock(first_407_then_200_with_port): with HTTMock(discharge_200): resp = requests.get('http://example.com:8000/someprotecteurl', cookies=client.cookies, auth=client.auth()) resp.raise_for_status() assert 'macaroon-test' in client.cookies.keys() def test_secure_cookie_for_https(self): client = httpbakery.Client() with HTTMock(first_407_then_200_with_port), HTTMock(discharge_200): resp = requests.get( 'https://example.com:8000/someprotecteurl', cookies=client.cookies, auth=client.auth()) resp.raise_for_status() assert 'macaroon-test' in client.cookies.keys() self.assert_cookie_security(client.cookies, 'macaroon-test', secure=True)