# Copyright 2017 Canonical Ltd. # Licensed under the LGPLv3, see LICENCE file for details. import copy import json import logging from collections import namedtuple import macaroonbakery.bakery as bakery import macaroonbakery.httpbakery as httpbakery import macaroonbakery._utils as utils import requests.cookies from six.moves.urllib.parse import urljoin log = logging.getLogger(__name__) class AgentFileFormatError(Exception): ''' AgentFileFormatError is the exception raised when an agent file has a bad structure. ''' pass def load_auth_info(filename): '''Loads agent authentication information from the specified file. The returned information is suitable for passing as an argument to the AgentInteractor constructor. @param filename The name of the file to open (str) @return AuthInfo The authentication information @raises AgentFileFormatError when the file format is bad. ''' with open(filename) as f: return read_auth_info(f.read()) def read_auth_info(agent_file_content): '''Loads agent authentication information from the specified content string, as read from an agents file. The returned information is suitable for passing as an argument to the AgentInteractor constructor. @param agent_file_content The agent file content (str) @return AuthInfo The authentication information @raises AgentFileFormatError when the file format is bad. ''' try: data = json.loads(agent_file_content) return AuthInfo( key=bakery.PrivateKey.deserialize(data['key']['private']), agents=list( Agent(url=a['url'], username=a['username']) for a in data.get('agents', []) ), ) except ( KeyError, ValueError, TypeError, ) as e: raise AgentFileFormatError('invalid agent file', e) class InteractionInfo(object): '''Holds the information expected in the agent interaction entry in an interaction-required error. ''' def __init__(self, login_url): self._login_url = login_url @property def login_url(self): ''' Return the URL from which to acquire a macaroon that can be used to complete the agent login. To acquire the macaroon, make a POST request to the URL with user and public-key parameters. :return string ''' return self._login_url @classmethod def from_dict(cls, json_dict): '''Return an InteractionInfo obtained from the given dictionary as deserialized from JSON. @param json_dict The deserialized JSON object. ''' return InteractionInfo(json_dict.get('login-url')) class AgentInteractor(httpbakery.Interactor, httpbakery.LegacyInteractor): ''' Interactor that performs interaction using the agent login protocol. ''' def __init__(self, auth_info): self._auth_info = auth_info def kind(self): '''Implement Interactor.kind by returning the agent kind''' return 'agent' def interact(self, client, location, interaction_required_err): '''Implement Interactor.interact by obtaining obtaining a macaroon from the discharger, discharging it with the local private key using the discharged macaroon as a discharge token''' p = interaction_required_err.interaction_method('agent', InteractionInfo) if p.login_url is None or p.login_url == '': raise httpbakery.InteractionError( 'no login-url field found in agent interaction method') agent = self._find_agent(location) if not location.endswith('/'): location += '/' login_url = urljoin(location, p.login_url) resp = requests.get( login_url, params={ 'username': agent.username, 'public-key': str(self._auth_info.key.public_key)}, auth=client.auth()) if resp.status_code != 200: raise httpbakery.InteractionError( 'cannot acquire agent macaroon: {} {}'.format( resp.status_code, resp.text) ) m = resp.json().get('macaroon') if m is None: raise httpbakery.InteractionError('no macaroon in response') m = bakery.Macaroon.from_dict(m) ms = bakery.discharge_all(m, None, self._auth_info.key) b = bytearray() for m in ms: b.extend(utils.b64decode(m.serialize())) return httpbakery.DischargeToken(kind='agent', value=bytes(b)) def _find_agent(self, location): ''' Finds an appropriate agent entry for the given location. :return Agent ''' for a in self._auth_info.agents: # Don't worry about trailing slashes if a.url.rstrip('/') == location.rstrip('/'): return a raise httpbakery.InteractionMethodNotFound( 'cannot find username for discharge location {}'.format(location)) def legacy_interact(self, client, location, visit_url): '''Implement LegacyInteractor.legacy_interact by obtaining the discharge macaroon using the client's private key ''' agent = self._find_agent(location) # Shallow-copy the client so that we don't unexpectedly side-effect # it by changing the key. Another possibility might be to # set up agent authentication differently, in such a way that # we're sure that client.key is the same as self._auth_info.key. client = copy.copy(client) client.key = self._auth_info.key resp = client.request( method='POST', url=visit_url, json={ 'username': agent.username, 'public_key': str(self._auth_info.key.public_key), }, ) if resp.status_code != 200: raise httpbakery.InteractionError( 'cannot acquire agent macaroon from {}: {} (response body: {!r})'.format(visit_url, resp.status_code, resp.text)) if not resp.json().get('agent_login', False): raise httpbakery.InteractionError('agent login failed') class Agent(namedtuple('Agent', 'url, username')): ''' Represents an agent that can be used for agent authentication. @param url(string) holds the URL of the discharger that knows about the agent. @param username holds the username agent (string). ''' class AuthInfo(namedtuple('AuthInfo', 'key, agents')): ''' Holds the agent information required to set up agent authentication information. It holds the agent's private key and information about the username associated with each known agent-authentication server. @param key the agent's private key (bakery.PrivateKey). @param agents information about the known agents (list of Agent). '''