diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/priority.egg-info/PKG-INFO | 214 | ||||
-rw-r--r-- | src/priority.egg-info/SOURCES.txt | 14 | ||||
-rw-r--r-- | src/priority.egg-info/dependency_links.txt | 1 | ||||
-rw-r--r-- | src/priority.egg-info/top_level.txt | 1 | ||||
-rw-r--r-- | src/priority/__init__.py | 8 | ||||
-rw-r--r-- | src/priority/priority.py | 481 |
6 files changed, 719 insertions, 0 deletions
diff --git a/src/priority.egg-info/PKG-INFO b/src/priority.egg-info/PKG-INFO new file mode 100644 index 0000000..fff2632 --- /dev/null +++ b/src/priority.egg-info/PKG-INFO @@ -0,0 +1,214 @@ +Metadata-Version: 1.1 +Name: priority +Version: 1.3.0 +Summary: A pure-Python implementation of the HTTP/2 priority tree +Home-page: http://python-hyper.org/priority/ +Author: Cory Benfield +Author-email: cory@lukasa.co.uk +License: MIT License +Description: Priority: A HTTP/2 Priority Implementation + ========================================== + + Priority is a pure-Python implementation of the priority logic for HTTP/2, set + out in `RFC 7540 Section 5.3 (Stream Priority)`_. This logic allows for clients + to express a preference for how the server allocates its (limited) resources to + the many outstanding HTTP requests that may be running over a single HTTP/2 + connection. + + Specifically, this Python implementation uses a variant of the implementation + used in the excellent `H2O`_ project. This original implementation is also the + inspiration for `nghttp2's`_ priority implementation, and generally produces a + very clean and even priority stream. The only notable changes from H2O's + implementation are small modifications to allow the priority implementation to + work cleanly as a separate implementation, rather than being embedded in a + HTTP/2 stack directly. + + While priority information in HTTP/2 is only a suggestion, rather than an + enforceable constraint, where possible servers should respect the priority + requests of their clients. + + Using Priority + -------------- + + Priority has a simple API. Streams are inserted into the tree: when they are + inserted, they may optionally have a weight, depend on another stream, or + become an exclusive dependent of another stream. + + .. code-block:: python + + >>> p = priority.PriorityTree() + >>> p.insert_stream(stream_id=1) + >>> p.insert_stream(stream_id=3) + >>> p.insert_stream(stream_id=5, depends_on=1) + >>> p.insert_stream(stream_id=7, weight=32) + >>> p.insert_stream(stream_id=9, depends_on=7, weight=8) + >>> p.insert_stream(stream_id=11, depends_on=7, exclusive=True) + + Once streams are inserted, the stream priorities can be requested. This allows + the server to make decisions about how to allocate resources. + + Iterating The Tree + ~~~~~~~~~~~~~~~~~~ + + The tree in this algorithm acts as a gate. Its goal is to allow one stream + "through" at a time, in such a manner that all the active streams are served as + evenly as possible in proportion to their weights. + + This is handled in Priority by iterating over the tree. The tree itself is an + iterator, and each time it is advanced it will yield a stream ID. This is the + ID of the stream that should next send data. + + This looks like this: + + .. code-block:: python + + >>> for stream_id in p: + ... send_data(stream_id) + + If each stream only sends when it is 'ungated' by this mechanism, the server + will automatically be emitting stream data in conformance to RFC 7540. + + Updating The Tree + ~~~~~~~~~~~~~~~~~ + + If for any reason a stream is unable to proceed (for example, it is blocked on + HTTP/2 flow control, or it is waiting for more data from another service), that + stream is *blocked*. The ``PriorityTree`` should be informed that the stream is + blocked so that other dependent streams get a chance to proceed. This can be + done by calling the ``block`` method of the tree with the stream ID that is + currently unable to proceed. This will automatically update the tree, and it + will adjust on the fly to correctly allow any streams that were dependent on + the blocked one to progress. + + For example: + + .. code-block:: python + + >>> for stream_id in p: + ... send_data(stream_id) + ... if blocked(stream_id): + ... p.block(stream_id) + + When a stream goes from being blocked to being unblocked, call the ``unblock`` + method to place it back into the sequence. Both the ``block`` and ``unblock`` + methods are idempotent and safe to call repeatedly. + + Additionally, the priority of a stream may change. When it does, the + ``reprioritize`` method can be used to update the tree in the wake of that + change. ``reprioritize`` has the same signature as ``insert_stream``, but + applies only to streams already in the tree. + + Removing Streams + ~~~~~~~~~~~~~~~~ + + A stream can be entirely removed from the tree by calling ``remove_stream``. + Note that this is not idempotent. Further, calling ``remove_stream`` and then + re-adding it *may* cause a substantial change in the shape of the priority + tree, and *will* cause the iteration order to change. + + License + ------- + + Priority is made available under the MIT License. For more details, see the + LICENSE file in the repository. + + Authors + ------- + + Priority is maintained by Cory Benfield, with contributions from others. For + more details about the contributors, please see CONTRIBUTORS.rst in the + repository. + + + .. _RFC 7540 Section 5.3 (Stream Priority): https://tools.ietf.org/html/rfc7540#section-5.3 + .. _nghttp2's: https://nghttp2.org/blog/2015/11/11/stream-scheduling-utilizing-http2-priority/ + .. _H2O: https://h2o.examp1e.net/ + + + Changelog + ========= + + 1.3.0 (2017-01-27) + ------------------ + + **API Changes** + + - Throw ``PriorityLoop`` when inserting or reprioritising a stream that + depends on itself. + - Throw ``BadWeightError`` when creating or reprioritising a stream with a + weight that is not an integer between 1 and 256, inclusive. + - Throw ``PseudoStreamError`` when trying to reprioritise, remove, block or + unblock stream 0. + - Add a new ``PriorityError`` parent class for the exceptions that can be + thrown by priority. + + 1.2.2 (2016-11-11) + ------------------ + + **Bugfixes** + + - Allow ``insert_stream`` to be called with ``exclusive=True`` but no explicit + ``depends_on`` value. + + 1.2.1 (2016-10-26) + ------------------ + + **Bugfixes** + + - Allow insertion of streams that have parents in the idle or closed states. + This would previously raise a KeyError. + + 1.2.0 (2016-08-04) + ------------------ + + **Security Fixes** + + - CVE-2016-6580: All versions of this library prior to 1.2.0 are vulnerable to + a denial of service attack whereby a remote peer can cause a user to insert + an unbounded number of streams into the priority tree, eventually consuming + all available memory. + + This version adds a ``TooManyStreamsError`` exception that is raised when + too many streams are inserted into the priority tree. It also adds a keyword + argument to the priority tree, ``maximum_streams``, which limits how many + streams may be inserted. By default, this number is set to 1000. + Implementations should strongly consider whether they can set this value + lower. + + 1.1.1 (2016-05-28) + ------------------ + + **Bugfixes** + + - 2.5x performance improvement by swapping from ``queue.PriorityQueue`` to + ``heapq``. + + 1.1.0 (2016-01-08) + ------------------ + + **API Changes** + + - Throw ``DuplicateStreamError`` when inserting a stream that is already in the + tree. + - Throw ``MissingStreamError`` when reprioritising a stream that is not in the + tree. + + 1.0.0 (2015-12-07) + ------------------ + + - Initial release. + +Platform: UNKNOWN +Classifier: Development Status :: 5 - Production/Stable +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: MIT License +Classifier: Programming Language :: Python +Classifier: Programming Language :: Python :: 2 +Classifier: Programming Language :: Python :: 2.7 +Classifier: Programming Language :: Python :: 3 +Classifier: Programming Language :: Python :: 3.3 +Classifier: Programming Language :: Python :: 3.4 +Classifier: Programming Language :: Python :: 3.5 +Classifier: Programming Language :: Python :: 3.6 +Classifier: Programming Language :: Python :: Implementation :: CPython +Classifier: Programming Language :: Python :: Implementation :: PyPy diff --git a/src/priority.egg-info/SOURCES.txt b/src/priority.egg-info/SOURCES.txt new file mode 100644 index 0000000..72e33bf --- /dev/null +++ b/src/priority.egg-info/SOURCES.txt @@ -0,0 +1,14 @@ +CONTRIBUTORS.rst +HISTORY.rst +LICENSE +MANIFEST.in +README.rst +setup.cfg +setup.py +src/priority/__init__.py +src/priority/priority.py +src/priority.egg-info/PKG-INFO +src/priority.egg-info/SOURCES.txt +src/priority.egg-info/dependency_links.txt +src/priority.egg-info/top_level.txt +test/test_priority.py
\ No newline at end of file diff --git a/src/priority.egg-info/dependency_links.txt b/src/priority.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/priority.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/src/priority.egg-info/top_level.txt b/src/priority.egg-info/top_level.txt new file mode 100644 index 0000000..4d8014f --- /dev/null +++ b/src/priority.egg-info/top_level.txt @@ -0,0 +1 @@ +priority diff --git a/src/priority/__init__.py b/src/priority/__init__.py new file mode 100644 index 0000000..6cf1005 --- /dev/null +++ b/src/priority/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +""" +priority: HTTP/2 priority implementation for Python +""" +from .priority import ( # noqa + Stream, PriorityTree, DeadlockError, PriorityLoop, DuplicateStreamError, + MissingStreamError, TooManyStreamsError, BadWeightError, PseudoStreamError +) diff --git a/src/priority/priority.py b/src/priority/priority.py new file mode 100644 index 0000000..0ada477 --- /dev/null +++ b/src/priority/priority.py @@ -0,0 +1,481 @@ +# -*- coding: utf-8 -*- +""" +priority/tree +~~~~~~~~~~~~~ + +Implementation of the Priority tree data structure. +""" +from __future__ import division + +import heapq +import sys + + +PY3 = sys.version_info[0] == 3 + + +class PriorityError(Exception): + """ + The base class for all ``priority`` exceptions. + """ + + +class DeadlockError(PriorityError): + """ + Raised when there are no streams that can make progress: all streams are + blocked. + """ + pass + + +class PriorityLoop(PriorityError): + """ + An unexpected priority loop has been detected. The tree is invalid. + """ + pass + + +class DuplicateStreamError(PriorityError): + """ + An attempt was made to insert a stream that already exists. + """ + pass + + +class MissingStreamError(KeyError, PriorityError): + """ + An operation was attempted on a stream that is not present in the tree. + """ + pass + + +class TooManyStreamsError(PriorityError): + """ + An attempt was made to insert a dangerous number of streams into the + priority tree at the same time. + + .. versionadded:: 1.2.0 + """ + pass + + +class BadWeightError(PriorityError): + """ + An attempt was made to create a stream with an invalid weight. + + .. versionadded:: 1.3.0 + """ + pass + + +class PseudoStreamError(PriorityError): + """ + An operation was attempted on stream 0. + + .. versionadded:: 1.3.0 + """ + pass + + +class Stream(object): + """ + Priority information for a given stream. + + :param stream_id: The stream ID for the new stream. + :param weight: (optional) The stream weight. Defaults to 16. + """ + def __init__(self, stream_id, weight=16): + self.stream_id = stream_id + self.weight = weight + self.children = [] + self.parent = None + self.child_queue = [] + self.active = True + self.last_weight = 0 + self._deficit = 0 + + @property + def weight(self): + return self._weight + + @weight.setter + def weight(self, value): + # RFC 7540 ยง 5.3.2: "All dependent streams are allocated an integer + # weight between 1 and 256 (inclusive)." + if not isinstance(value, int): + raise BadWeightError("Stream weight should be an integer") + elif not (1 <= value <= 256): + raise BadWeightError( + "Stream weight must be between 1 and 256 (inclusive)") + self._weight = value + + def add_child(self, child): + """ + Add a stream that depends on this one. + + :param child: A ``Stream`` object that depends on this one. + """ + child.parent = self + self.children.append(child) + heapq.heappush(self.child_queue, (self.last_weight, child)) + + def add_child_exclusive(self, child): + """ + Add a stream that exclusively depends on this one. + + :param child: A ``Stream`` object that exclusively depends on this one. + """ + old_children = self.children + self.children = [] + self.child_queue = [] + self.last_weight = 0 + self.add_child(child) + + for old_child in old_children: + child.add_child(old_child) + + def remove_child(self, child, strip_children=True): + """ + Removes a child stream from this stream. This is a potentially somewhat + expensive operation. + + :param child: The child stream to remove. + :param strip_children: Whether children of the removed stream should + become children of this stream. + """ + # To do this we do the following: + # + # - remove the child stream from the list of children + # - build a new priority queue, filtering out the child when we find + # it in the old one + self.children.remove(child) + + new_queue = [] + + while self.child_queue: + level, stream = heapq.heappop(self.child_queue) + if stream == child: + continue + + heapq.heappush(new_queue, (level, stream)) + + self.child_queue = new_queue + + if strip_children: + for new_child in child.children: + self.add_child(new_child) + + def schedule(self): + """ + Returns the stream ID of the next child to schedule. Potentially + recurses down the tree of priorities. + """ + # Cannot be called on active streams. + assert not self.active + + next_stream = None + popped_streams = [] + + # Spin looking for the next active stream. Everything we pop off has + # to be rescheduled, even if it turns out none of them were active at + # this time. + try: + while next_stream is None: + # If the queue is empty, immediately fail. + val = heapq.heappop(self.child_queue) + popped_streams.append(val) + level, child = val + + if child.active: + next_stream = child.stream_id + else: + # Guard against the possibility that the child also has no + # suitable children. + try: + next_stream = child.schedule() + except IndexError: + continue + finally: + for level, child in popped_streams: + self.last_weight = level + level += (256 + child._deficit) // child.weight + child._deficit = (256 + child._deficit) % child.weight + heapq.heappush(self.child_queue, (level, child)) + + return next_stream + + # Custom repr + def __repr__(self): + return "Stream<id=%d, weight=%d>" % (self.stream_id, self.weight) + + # Custom comparison + def __eq__(self, other): + if not isinstance(other, Stream): # pragma: no cover + return False + + return self.stream_id == other.stream_id + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if not isinstance(other, Stream): # pragma: no cover + return NotImplemented + + return self.stream_id < other.stream_id + + def __le__(self, other): + if not isinstance(other, Stream): # pragma: no cover + return NotImplemented + + return self.stream_id <= other.stream_id + + def __gt__(self, other): + if not isinstance(other, Stream): # pragma: no cover + return NotImplemented + + return self.stream_id > other.stream_id + + def __ge__(self, other): + if not isinstance(other, Stream): # pragma: no cover + return NotImplemented + + return self.stream_id >= other.stream_id + + +def _stream_cycle(new_parent, current): + """ + Reports whether the new parent depends on the current stream. + """ + parent = new_parent + + # Don't iterate forever, but instead assume that the tree doesn't + # get more than 100 streams deep. This should catch accidental + # tree loops. This is the definition of defensive programming. + for _ in range(100): + parent = parent.parent + if parent.stream_id == current.stream_id: + return True + elif parent.stream_id == 0: + return False + + raise PriorityLoop( + "Stream %d is in a priority loop." % new_parent.stream_id + ) # pragma: no cover + + +class PriorityTree(object): + """ + A HTTP/2 Priority Tree. + + This tree stores HTTP/2 streams according to their HTTP/2 priorities. + + .. versionchanged:: 1.2.0 + Added ``maximum_streams`` keyword argument. + + :param maximum_streams: The maximum number of streams that may be active in + the priority tree at any one time. If this number is exceeded, the + priority tree will raise a :class:`TooManyStreamsError + <priority.TooManyStreamsError>` and will refuse to insert the stream. + + This parameter exists to defend against the possibility of DoS attack + by attempting to overfill the priority tree. If any endpoint is + attempting to manage the priority of this many streams at once it is + probably trying to screw with you, so it is sensible to simply refuse + to play ball at that point. + + While we allow the user to configure this, we don't really *expect* + them too, unless they want to be even more conservative than we are by + default. + :type maximum_streams: ``int`` + """ + def __init__(self, maximum_streams=1000): + # This flat array keeps hold of all the streams that are logically + # dependent on stream 0. + self._root_stream = Stream(stream_id=0, weight=1) + self._root_stream.active = False + self._streams = {0: self._root_stream} + self._maximum_streams = maximum_streams + + def _get_or_insert_parent(self, parent_stream_id): + """ + When inserting or reprioritizing a stream it is possible to make it + dependent on a stream that is no longer in the tree. In this situation, + rather than bail out, we should insert the parent stream into the tree + with default priority and mark it as blocked. + """ + try: + return self._streams[parent_stream_id] + except KeyError: + self.insert_stream(parent_stream_id) + self.block(parent_stream_id) + return self._streams[parent_stream_id] + + def _exclusive_insert(self, parent_stream, inserted_stream): + """ + Insert ``inserted_stream`` beneath ``parent_stream``, obeying the + semantics of exclusive insertion. + """ + parent_stream.add_child_exclusive(inserted_stream) + + def insert_stream(self, + stream_id, + depends_on=None, + weight=16, + exclusive=False): + """ + Insert a stream into the tree. + + :param stream_id: The stream ID of the stream being inserted. + :param depends_on: (optional) The ID of the stream that the new stream + depends on, if any. + :param weight: (optional) The weight to give the new stream. Defaults + to 16. + :param exclusive: (optional) Whether this new stream should be an + exclusive dependency of the parent. + """ + if stream_id in self._streams: + raise DuplicateStreamError("Stream %d already in tree" % stream_id) + + if (len(self._streams) + 1) > self._maximum_streams: + raise TooManyStreamsError( + "Refusing to insert %d streams into priority tree at once" % ( + self._maximum_streams + 1 + ) + ) + + stream = Stream(stream_id, weight) + + if not depends_on: + depends_on = 0 + elif depends_on == stream_id: + raise PriorityLoop( + "Stream %d must not depend on itself." % stream_id + ) + + if exclusive: + parent_stream = self._get_or_insert_parent(depends_on) + self._exclusive_insert(parent_stream, stream) + self._streams[stream_id] = stream + return + + parent = self._get_or_insert_parent(depends_on) + parent.add_child(stream) + self._streams[stream_id] = stream + + def reprioritize(self, + stream_id, + depends_on=None, + weight=16, + exclusive=False): + """ + Update the priority status of a stream already in the tree. + + :param stream_id: The stream ID of the stream being updated. + :param depends_on: (optional) The ID of the stream that the stream now + depends on. If ``None``, will be moved to depend on stream 0. + :param weight: (optional) The new weight to give the stream. Defaults + to 16. + :param exclusive: (optional) Whether this stream should now be an + exclusive dependency of the new parent. + """ + if stream_id == 0: + raise PseudoStreamError("Cannot reprioritize stream 0") + + try: + current_stream = self._streams[stream_id] + except KeyError: + raise MissingStreamError("Stream %d not in tree" % stream_id) + + # Update things in a specific order to make sure the calculation + # behaves properly. Specifically, we first update the weight. Then, + # we check whether this stream is being made dependent on one of its + # own dependents. Then, we remove this stream from its current parent + # and move it to its new parent, taking its children with it. + if depends_on: + if depends_on == stream_id: + raise PriorityLoop( + "Stream %d must not depend on itself" % stream_id + ) + + new_parent = self._get_or_insert_parent(depends_on) + cycle = _stream_cycle(new_parent, current_stream) + else: + new_parent = self._streams[0] + cycle = False + + current_stream.weight = weight + + # Our new parent is currently dependent on us. We should remove it from + # its parent, and make it a child of our current parent, and then + # continue. + if cycle: + new_parent.parent.remove_child(new_parent) + current_stream.parent.add_child(new_parent) + + current_stream.parent.remove_child( + current_stream, strip_children=False + ) + + if exclusive: + new_parent.add_child_exclusive(current_stream) + else: + new_parent.add_child(current_stream) + + def remove_stream(self, stream_id): + """ + Removes a stream from the priority tree. + + :param stream_id: The ID of the stream to remove. + """ + if stream_id == 0: + raise PseudoStreamError("Cannot remove stream 0") + + try: + child = self._streams.pop(stream_id) + except KeyError: + raise MissingStreamError("Stream %d not in tree" % stream_id) + + parent = child.parent + parent.remove_child(child) + + def block(self, stream_id): + """ + Marks a given stream as blocked, with no data to send. + + :param stream_id: The ID of the stream to block. + """ + if stream_id == 0: + raise PseudoStreamError("Cannot block stream 0") + + try: + self._streams[stream_id].active = False + except KeyError: + raise MissingStreamError("Stream %d not in tree" % stream_id) + + def unblock(self, stream_id): + """ + Marks a given stream as unblocked, with more data to send. + + :param stream_id: The ID of the stream to unblock. + """ + if stream_id == 0: + raise PseudoStreamError("Cannot unblock stream 0") + + try: + self._streams[stream_id].active = True + except KeyError: + raise MissingStreamError("Stream %d not in tree" % stream_id) + + # The iterator protocol + def __iter__(self): # pragma: no cover + return self + + def __next__(self): # pragma: no cover + try: + return self._root_stream.schedule() + except IndexError: + raise DeadlockError("No unblocked streams to schedule.") + + def next(self): # pragma: no cover + return self.__next__() |