summaryrefslogtreecommitdiff
path: root/macaroonbakery/checkers/_checkers.py
blob: 11a41b9959233d89b46627c4d131cd82729fb21c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# Copyright 2017 Canonical Ltd.
# Licensed under the LGPLv3, see LICENCE file for details.
import abc
from collections import namedtuple
from datetime import datetime

import pyrfc3339
from ._caveat import parse_caveat
from ._conditions import (
    COND_ALLOW,
    COND_DECLARED,
    COND_DENY,
    COND_ERROR,
    COND_TIME_BEFORE,
    STD_NAMESPACE,
)
from ._declared import DECLARED_KEY
from ._namespace import Namespace
from ._operation import OP_KEY
from ._time import TIME_KEY
from ._utils import condition_with_prefix


class RegisterError(Exception):
    '''Raised when a condition cannot be registered with a Checker.'''
    pass


class FirstPartyCaveatChecker(object):
    '''Used to check first party caveats for validity with respect to
    information in the provided context.

    If the caveat kind was not recognised, the checker should return
    ErrCaveatNotRecognized.
    '''
    __metaclass__ = abc.ABCMeta

    @abc.abstractmethod
    def check_first_party_caveat(self, ctx, caveat):
        '''	Checks that the given caveat condition is valid with respect to
        the given context information.
        :param ctx: an Auth context
        :param caveat a string
        '''
        raise NotImplementedError('check_first_party_caveat method must be '
                                  'defined in subclass')

    def namespace(self):
        '''	Returns the namespace associated with the caveat checker.
        '''
        raise NotImplementedError('namespace method must be '
                                  'defined in subclass')


class Checker(FirstPartyCaveatChecker):
    ''' Holds a set of checkers for first party caveats.
    '''

    def __init__(self, namespace=None, include_std_checkers=True):
        if namespace is None:
            namespace = Namespace()
        self._namespace = namespace
        self._checkers = {}
        if include_std_checkers:
            self.register_std()

    def check_first_party_caveat(self, ctx, cav):
        ''' Checks the caveat against all registered caveat conditions.
        :return: error message string if any or None
        '''
        try:
            cond, arg = parse_caveat(cav)
        except ValueError as ex:
            # If we can't parse it, perhaps it's in some other format,
            # return a not-recognised error.
            return 'cannot parse caveat "{}": {}'.format(cav, ex.args[0])
        checker = self._checkers.get(cond)
        if checker is None:
            return 'caveat "{}" not satisfied: caveat not recognized'.format(
                cav)
        err = checker.check(ctx, cond, arg)
        if err is not None:
            return 'caveat "{}" not satisfied: {}'.format(cav, err)

    def namespace(self):
        ''' Returns the namespace associated with the Checker.
        '''
        return self._namespace

    def info(self):
        ''' Returns information on all the registered checkers.

        Sorted by namespace and then name
        :returns a list of CheckerInfo
        '''
        return sorted(self._checkers.values(), key=lambda x: (x.ns, x.name))

    def register(self, cond, uri, check):
        ''' Registers the given condition(string) in the given namespace
        uri (string) to be checked with the given check function.
        The check function checks a caveat by passing an auth context, a cond
        parameter(string) that holds the caveat condition including any
        namespace prefix and an arg parameter(string) that hold any additional
        caveat argument text. It will return any error as string otherwise
        None.

        It will raise a ValueError if the namespace is not registered or
        if the condition has already been registered.
        '''
        if check is None:
            raise RegisterError(
                'no check function registered for namespace {} when '
                'registering condition {}'.format(uri, cond))

        prefix = self._namespace.resolve(uri)
        if prefix is None:
            raise RegisterError('no prefix registered for namespace {} when '
                                'registering condition {}'.format(uri, cond))

        if prefix == '' and cond.find(':') >= 0:
            raise RegisterError(
                'caveat condition {} in namespace {} contains a colon but its'
                ' prefix is empty'.format(cond, uri))

        full_cond = condition_with_prefix(prefix, cond)
        info = self._checkers.get(full_cond)
        if info is not None:
            raise RegisterError(
                'checker for {} (namespace {}) already registered in '
                'namespace {}'.format(full_cond, uri, info.ns))
        self._checkers[full_cond] = CheckerInfo(
            check=check,
            ns=uri,
            name=cond,
            prefix=prefix)

    def register_std(self):
        ''' Registers all the standard checkers in the given checker.

        If not present already, the standard checkers schema (STD_NAMESPACE) is
        added to the checker's namespace with an empty prefix.
        '''
        self._namespace.register(STD_NAMESPACE, '')
        for cond in _ALL_CHECKERS:
            self.register(cond, STD_NAMESPACE, _ALL_CHECKERS[cond])


class CheckerInfo(namedtuple('CheckInfo', 'prefix name ns check')):
    '''CheckerInfo holds information on a registered checker.
    '''
    __slots__ = ()

    def __new__(cls, prefix, name, ns, check=None):
        '''
        :param check holds the actual checker function which takes an auth
        context and a condition and arg string as arguments.
        :param prefix holds the prefix for the checker condition as string.
        :param name holds the name of the checker condition as string.
        :param ns holds the namespace URI for the checker's schema as
        Namespace.
        '''
        return super(CheckerInfo, cls).__new__(cls, prefix, name, ns, check)


def _check_time_before(ctx, cond, arg):
    clock = ctx.get(TIME_KEY)
    if clock is None:
        now = datetime.utcnow()
    else:
        now = clock.utcnow()

    try:
        # Note: pyrfc3339 returns a datetime with a timezone, which
        # we need to remove before we can compare it with the naive
        # datetime object returned by datetime.utcnow.
        expiry = pyrfc3339.parse(arg, utc=True).replace(tzinfo=None)
        if now >= expiry:
            return 'macaroon has expired'
    except ValueError:
        return 'cannot parse "{}" as RFC 3339'.format(arg)
    return None


def _check_declared(ctx, cond, arg):
    parts = arg.split(' ', 1)
    if len(parts) != 2:
        return 'declared caveat has no value'
    attrs = ctx.get(DECLARED_KEY, {})
    val = attrs.get(parts[0])
    if val is None:
        return 'got {}=null, expected "{}"'.format(parts[0], parts[1])

    if val != parts[1]:
        return 'got {}="{}", expected "{}"'.format(parts[0], val, parts[1])
    return None


def _check_error(ctx, cond, arg):
    return 'bad caveat'


def _check_allow(ctx, cond, arg):
    return _check_operations(ctx, True, arg)


def _check_deny(ctx, cond, arg):
    return _check_operations(ctx, False, arg)


def _check_operations(ctx, need_ops, arg):
    ''' Checks an allow or a deny caveat. The need_ops parameter specifies
    whether we require all the operations in the caveat to be declared in
    the context.
    '''
    ctx_ops = ctx.get(OP_KEY, [])
    if len(ctx_ops) == 0:
        if need_ops:
            f = arg.split()
            if len(f) == 0:
                return 'no operations allowed'
            return '{} not allowed'.format(f[0])
        return None

    fields = arg.split()
    for op in ctx_ops:
        err = _check_op(op, need_ops, fields)
        if err is not None:
            return err
    return None


def _check_op(ctx_op, need_op, fields):
    found = False
    for op in fields:
        if op == ctx_op:
            found = True
            break
    if found != need_op:
        return '{} not allowed'.format(ctx_op)
    return None


_ALL_CHECKERS = {
    COND_TIME_BEFORE: _check_time_before,
    COND_DECLARED: _check_declared,
    COND_ERROR: _check_error,
    COND_ALLOW: _check_allow,
    COND_DENY: _check_deny,
}