summaryrefslogtreecommitdiff
path: root/test/test_priority.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_priority.py')
-rw-r--r--test/test_priority.py542
1 files changed, 542 insertions, 0 deletions
diff --git a/test/test_priority.py b/test/test_priority.py
new file mode 100644
index 0000000..c98a28d
--- /dev/null
+++ b/test/test_priority.py
@@ -0,0 +1,542 @@
+# -*- coding: utf-8 -*-
+"""
+test_priority
+~~~~~~~~~~~~~
+
+Tests for the Priority trees
+"""
+from __future__ import division
+
+import collections
+import itertools
+
+import pytest
+
+from hypothesis import given
+from hypothesis.strategies import (
+ integers, lists, tuples, sampled_from
+)
+
+import priority
+
+
+STREAMS_AND_WEIGHTS = lists(
+ elements=tuples(
+ integers(min_value=1), integers(min_value=1, max_value=255)
+ ),
+ unique_by=lambda x: x[0],
+)
+
+BLOCKED_AND_ACTIVE = lists(
+ elements=sampled_from([1, 3, 5, 7, 9, 11]),
+ unique=True,
+).map(
+ lambda blocked: (blocked, active_readme_streams_from_filter(blocked))
+)
+
+UNBLOCKED_AND_ACTIVE = lists(
+ elements=sampled_from([1, 3, 5, 7, 9, 11]),
+ unique=True,
+).map(
+ lambda unblocked: (unblocked, active_readme_streams_from_filter(
+ unblocked, blocked=False
+ ))
+)
+
+
+def readme_tree():
+ """
+ Provide a tree configured as the one in the readme.
+ """
+ p = priority.PriorityTree()
+ p.insert_stream(stream_id=1)
+ p.insert_stream(stream_id=3)
+ p.insert_stream(stream_id=5, depends_on=1)
+ p.insert_stream(stream_id=7, weight=32)
+ p.insert_stream(stream_id=9, depends_on=7, weight=8)
+ p.insert_stream(stream_id=11, depends_on=7, exclusive=True)
+ return p
+
+
+def active_readme_streams_from_filter(filtered, blocked=True):
+ """
+ Given a collection of filtered streams, determine which ones are active.
+ This applies only to the readme tree at this time, though in future it
+ should be possible to apply this to an arbitrary tree.
+
+ If ``blocked`` is ``True``, the filter is a set of blocked streams. If
+ ``False``, it's a collection of unblocked streams.
+ """
+ tree = {
+ 1: {
+ 5: {},
+ },
+ 3: {},
+ 7: {
+ 11: {
+ 9: {},
+ },
+ },
+ }
+ filtered = set(filtered)
+
+ def get_expected(tree):
+ expected = []
+
+ for stream_id in tree:
+ if stream_id not in filtered and blocked:
+ expected.append(stream_id)
+ elif stream_id in filtered and not blocked:
+ expected.append(stream_id)
+ else:
+ expected.extend(get_expected(tree[stream_id]))
+
+ return expected
+
+ return get_expected(tree)
+
+
+def active_streams_from_unblocked(unblocked):
+ """
+ Given a collection of unblocked streams, determine which ones are active.
+ This applies only to the readme tree at this time, though in future it
+ should be possible to apply this to an arbitrary tree.
+ """
+
+
+class TestStream(object):
+ def test_stream_repr(self):
+ """
+ The stream representation renders according to the README.
+ """
+ s = priority.Stream(stream_id=80, weight=16)
+ assert repr(s) == "Stream<id=80, weight=16>"
+
+ @given(STREAMS_AND_WEIGHTS)
+ def test_streams_are_well_ordered(self, streams_and_weights):
+ """
+ Streams are ordered by their stream ID.
+ """
+ stream_list = [
+ priority.Stream(stream_id=s, weight=w)
+ for s, w in streams_and_weights
+ ]
+ stream_list = sorted(stream_list)
+ streams_by_id = [stream.stream_id for stream in stream_list]
+ assert sorted(streams_by_id) == streams_by_id
+
+ @given(
+ integers(min_value=1, max_value=2**24),
+ integers(min_value=1, max_value=2**24)
+ )
+ def test_stream_ordering(self, a, b):
+ """
+ Two streams are well ordered based on their stream ID.
+ """
+ s1 = priority.Stream(stream_id=a, weight=16)
+ s2 = priority.Stream(stream_id=b, weight=32)
+
+ assert (s1 < s2) == (a < b)
+ assert (s1 <= s2) == (a <= b)
+ assert (s1 > s2) == (a > b)
+ assert (s1 >= s2) == (a >= b)
+ assert (s1 == s2) == (a == b)
+ assert (s1 != s2) == (a != b)
+
+
+class TestPriorityTreeManual(object):
+ """
+ These tests manually confirm that the PriorityTree output is correct. They
+ use the PriorityTree given in the README and confirm that it outputs data
+ as expected.
+
+ If possible, I'd like to eventually replace these tests with
+ Hypothesis-based ones for the same data, but getting Hypothesis to generate
+ useful data in this case is going to be quite tricky.
+ """
+ @given(BLOCKED_AND_ACTIVE)
+ def test_priority_tree_initially_outputs_all_stream_ids(self,
+ blocked_expected):
+ """
+ The first iterations of the priority tree initially output the active
+ streams, in order of stream ID, regardless of weight.
+ """
+ tree = readme_tree()
+ blocked = blocked_expected[0]
+ expected = blocked_expected[1]
+
+ for stream_id in blocked:
+ tree.block(stream_id)
+
+ result = [next(tree) for _ in range(len(expected))]
+ assert expected == result
+
+ @given(UNBLOCKED_AND_ACTIVE)
+ def test_priority_tree_blocking_is_isomorphic(self,
+ allowed_expected):
+ """
+ Blocking everything and then unblocking certain ones has the same
+ effect as blocking specific streams.
+ """
+ tree = readme_tree()
+ allowed = allowed_expected[0]
+ expected = allowed_expected[1]
+
+ for stream_id in range(1, 12, 2):
+ tree.block(stream_id)
+
+ for stream_id in allowed:
+ tree.unblock(stream_id)
+
+ result = [next(tree) for _ in range(len(expected))]
+ assert expected == result
+
+ @given(BLOCKED_AND_ACTIVE)
+ def test_removing_items_behaves_similarly_to_blocking(self,
+ blocked_expected):
+ """
+ From the perspective of iterating over items, removing streams should
+ have the same effect as blocking them, except that the ordering
+ changes. Because the ordering is not important, don't test for it.
+ """
+ tree = readme_tree()
+ blocked = blocked_expected[0]
+ expected = set(blocked_expected[1])
+
+ for stream_id in blocked:
+ tree.remove_stream(stream_id)
+
+ result = set(next(tree) for _ in range(len(expected)))
+ assert expected == result
+
+ def test_priority_tree_raises_deadlock_error_if_all_blocked(self):
+ """
+ Assuming all streams are blocked and none can progress, asking for the
+ one with the next highest priority fires a DeadlockError.
+ """
+ tree = readme_tree()
+ for stream_id in range(1, 12, 2):
+ tree.block(stream_id)
+
+ with pytest.raises(priority.DeadlockError):
+ next(tree)
+
+ @pytest.mark.parametrize(
+ 'stream,new_parent,exclusive,weight,blocked,result',
+ [
+ (1, 3, False, 16, [], [3, 7, 7, 3, 7, 7, 3, 7, 7]),
+ (1, 5, False, 16, [], [3, 5, 7, 7, 3, 5, 7, 7, 3]),
+ (1, 5, False, 16, [5], [3, 1, 7, 7, 3, 1, 7, 7, 3]),
+ (5, 7, False, 16, [7, 1], [3, 5, 11, 3, 5, 11, 3, 5, 11]),
+ (11, None, False, 16, [], [1, 3, 7, 11, 7, 1, 3, 7, 11]),
+ (11, None, False, 16, [11], [1, 3, 7, 9, 7, 1, 3, 7, 9]),
+ (7, 9, False, 16, [], [1, 3, 9, 1, 3, 1, 3, 9, 1]),
+ (7, 1, True, 16, [], [1, 3, 1, 3, 1, 3, 1, 3, 1]),
+ (7, 1, True, 16, [1], [7, 3, 7, 3, 7, 3, 7, 3, 7]),
+ (7, 1, True, 16, [1, 7], [5, 3, 11, 3, 5, 3, 11, 3, 5]),
+ (1, 0, False, 32, [], [1, 3, 7, 1, 7, 1, 3, 7, 1]),
+ (1, 0, True, 32, [], [1, 1, 1, 1, 1, 1, 1, 1, 1]),
+ (1, 0, True, 32, [1], [3, 5, 7, 7, 3, 5, 7, 7, 3]),
+ (1, None, True, 32, [], [1, 1, 1, 1, 1, 1, 1, 1, 1]),
+ (1, None, True, 32, [1], [3, 5, 7, 7, 3, 5, 7, 7, 3]),
+ ]
+ )
+ def test_can_reprioritize_a_stream(self,
+ stream,
+ new_parent,
+ exclusive,
+ weight,
+ blocked,
+ result):
+ """
+ Reprioritizing streams adjusts the outputs of the tree.
+ """
+ t = readme_tree()
+
+ for s in blocked:
+ t.block(s)
+
+ t.reprioritize(
+ stream_id=stream,
+ depends_on=new_parent,
+ weight=weight,
+ exclusive=exclusive,
+ )
+
+ actual_result = [next(t) for _ in range(len(result))]
+ assert actual_result == result
+
+ def test_priority_tree_raises_error_inserting_duplicate(self):
+ """
+ Attempting to insert a stream that is already in the tree raises a
+ DuplicateStreamError
+ """
+ p = priority.PriorityTree()
+ p.insert_stream(1)
+
+ with pytest.raises(priority.DuplicateStreamError):
+ p.insert_stream(1)
+
+ def test_priority_raises_good_errors_for_missing_streams(self):
+ """
+ Attempting operations on absent streams raises a MissingStreamError.
+ """
+ p = priority.PriorityTree()
+ p.insert_stream(1)
+
+ with pytest.raises(priority.MissingStreamError):
+ p.reprioritize(3)
+
+ with pytest.raises(priority.MissingStreamError):
+ p.block(3)
+
+ with pytest.raises(priority.MissingStreamError):
+ p.unblock(3)
+
+ with pytest.raises(priority.MissingStreamError):
+ p.remove_stream(3)
+
+ def test_priority_raises_good_errors_for_zero_stream(self):
+ """
+ Attempting operations on stream 0 raises a PseudoStreamError.
+ """
+ p = priority.PriorityTree()
+ p.insert_stream(1)
+
+ with pytest.raises(priority.PseudoStreamError):
+ p.reprioritize(0)
+
+ with pytest.raises(priority.PseudoStreamError):
+ p.block(0)
+
+ with pytest.raises(priority.PseudoStreamError):
+ p.unblock(0)
+
+ with pytest.raises(priority.PseudoStreamError):
+ p.remove_stream(0)
+
+ @pytest.mark.parametrize('exclusive', [True, False])
+ def test_priority_allows_inserting_stream_with_absent_parent(self,
+ exclusive):
+ """
+ Attemping to insert a stream that depends on a stream that is not in
+ the tree automatically inserts the parent with default priority.
+ """
+ p = priority.PriorityTree()
+ p.insert_stream(
+ stream_id=3, depends_on=1, exclusive=exclusive, weight=32
+ )
+
+ # Iterate 10 times to prove that the parent stream starts blocked.
+ first_ten_ids = [next(p) for _ in range(0, 10)]
+ assert first_ten_ids == [3] * 10
+
+ # Unblock the parent.
+ p.unblock(1)
+
+ # Iterate 10 times, expecting only the parent.
+ next_ten_ids = [next(p) for _ in range(0, 10)]
+ assert next_ten_ids == [1] * 10
+
+ # Insert a new stream into the tree with default priority.
+ p.insert_stream(stream_id=5)
+
+ # Iterate 10 more times. Expect the parent, and the new stream, in
+ # equal amounts.
+ next_ten_ids = [next(p) for _ in range(0, 10)]
+ assert next_ten_ids == [5, 1] * 5
+
+ @pytest.mark.parametrize('exclusive', [True, False])
+ def test_priority_reprioritizing_stream_with_absent_parent(self,
+ exclusive):
+ """
+ Attemping to reprioritize a stream to depend on a stream that is not in
+ the tree automatically inserts the parent with default priority.
+ """
+ p = priority.PriorityTree()
+ p.insert_stream(stream_id=3)
+
+ p.reprioritize(
+ stream_id=3, depends_on=1, exclusive=exclusive, weight=32
+ )
+
+ # Iterate 10 times to prove that the parent stream starts blocked.
+ first_ten_ids = [next(p) for _ in range(0, 10)]
+ assert first_ten_ids == [3] * 10
+
+ # Unblock the parent.
+ p.unblock(1)
+
+ # Iterate 10 times, expecting only the parent.
+ next_ten_ids = [next(p) for _ in range(0, 10)]
+ assert next_ten_ids == [1] * 10
+
+ # Insert a new stream into the tree with default priority.
+ p.insert_stream(stream_id=5)
+
+ # Iterate 10 more times. Expect the parent, and the new stream, in
+ # equal amounts.
+ next_ten_ids = [next(p) for _ in range(0, 10)]
+ assert next_ten_ids == [5, 1] * 5
+
+ @pytest.mark.parametrize('count', range(2, 10000, 100))
+ def test_priority_refuses_to_allow_too_many_streams_in_tree(self, count):
+ """
+ Attempting to insert more streams than maximum_streams into the tree
+ fails.
+ """
+ p = priority.PriorityTree(maximum_streams=count)
+
+ # This isn't an off-by-one error: stream 0 is in the tree by default.
+ for x in range(1, count):
+ p.insert_stream(x)
+
+ with pytest.raises(priority.TooManyStreamsError):
+ p.insert_stream(x + 1)
+
+ @pytest.mark.parametrize('depends_on', [0, None])
+ def test_can_insert_stream_with_exclusive_dependency_on_0(self,
+ depends_on):
+ """
+ It is acceptable to insert a stream with an exclusive dependency on
+ stream 0, both explicitly and implicitly.
+ """
+ p = priority.PriorityTree()
+ p.insert_stream(stream_id=1)
+ p.insert_stream(stream_id=3)
+
+ p.insert_stream(stream_id=5, depends_on=depends_on, exclusive=True)
+
+ next_ten_ids = [next(p) for _ in range(0, 10)]
+ assert next_ten_ids == [5] * 10
+
+ @pytest.mark.parametrize('weight', [
+ None,
+ 0.5,
+ float('inf'),
+ 'priority',
+ object
+ ])
+ def test_stream_with_non_integer_weight_is_error(self, weight):
+ """
+ Giving a stream a non-integer weight is rejected.
+ """
+ p = priority.PriorityTree()
+ with pytest.raises(priority.BadWeightError) as err:
+ p.insert_stream(stream_id=1, weight=weight)
+ assert err.value.args[0] == 'Stream weight should be an integer'
+
+ p.insert_stream(stream_id=2)
+ with pytest.raises(priority.BadWeightError) as err:
+ p.reprioritize(stream_id=2, weight=weight)
+ assert err.value.args[0] == 'Stream weight should be an integer'
+
+ @pytest.mark.parametrize('weight', [
+ 0,
+ 257,
+ 1000,
+ -42,
+ ])
+ def test_stream_with_out_of_bounds_weight_is_error(self, weight):
+ """
+ Giving a stream an out-of-bounds integer weight is rejected.
+ """
+ p = priority.PriorityTree()
+ with pytest.raises(priority.BadWeightError) as err:
+ p.insert_stream(stream_id=1, weight=weight)
+ assert (
+ err.value.args[0] ==
+ 'Stream weight must be between 1 and 256 (inclusive)')
+
+ p.insert_stream(stream_id=2)
+ with pytest.raises(priority.BadWeightError) as err:
+ p.reprioritize(stream_id=2, weight=weight)
+ assert (
+ err.value.args[0] ==
+ 'Stream weight must be between 1 and 256 (inclusive)')
+
+ @pytest.mark.parametrize('exclusive', (True, False))
+ @pytest.mark.parametrize('stream_id', (1, 5, 20, 32, 256))
+ def test_stream_depending_on_self_is_error(self, stream_id, exclusive):
+ """
+ Inserting a stream that is dependent on itself is rejected.
+ """
+ p = priority.PriorityTree()
+ with pytest.raises(priority.PriorityLoop):
+ p.insert_stream(
+ stream_id=stream_id, depends_on=stream_id, exclusive=exclusive
+ )
+
+ @pytest.mark.parametrize('exclusive', (True, False))
+ @pytest.mark.parametrize('stream_id', (1, 5, 20, 32, 256))
+ def test_reprioritize_depend_on_self_is_error(self, stream_id, exclusive):
+ """
+ Reprioritizing a stream to make it dependent on itself is an error.
+ """
+ p = priority.PriorityTree()
+ p.insert_stream(stream_id=stream_id)
+ with pytest.raises(priority.PriorityLoop):
+ p.reprioritize(
+ stream_id=stream_id, depends_on=stream_id, exclusive=exclusive
+ )
+
+
+class TestPriorityTreeOutput(object):
+ """
+ These tests use Hypothesis to attempt to bound the output of iterating over
+ the priority tree. In particular, their goal is to ensure that the output
+ of the tree is "good enough": that it meets certain requirements on
+ fairness and equidistribution.
+ """
+ @given(STREAMS_AND_WEIGHTS)
+ def test_period_of_repetition(self, streams_and_weights):
+ """
+ The period of repetition of a priority sequence is given by the sum of
+ the weights of the streams. Once that many values have been pulled out
+ the sequence repeats identically.
+ """
+ p = priority.PriorityTree()
+ weights = []
+
+ for stream, weight in streams_and_weights:
+ p.insert_stream(stream_id=stream, weight=weight)
+ weights.append(weight)
+
+ period = sum(weights)
+
+ # Pop off the first n elements, which will always be evenly
+ # distributed.
+ for _ in weights:
+ next(p)
+
+ pattern = [next(p) for _ in range(period)]
+ pattern = itertools.cycle(pattern)
+
+ for i in range(period * 20):
+ assert next(p) == next(pattern), i
+
+ @given(STREAMS_AND_WEIGHTS)
+ def test_priority_tree_distribution(self, streams_and_weights):
+ """
+ Once a full period of repetition has been observed, each stream has
+ been emitted a number of times equal to its weight.
+ """
+ p = priority.PriorityTree()
+ weights = []
+
+ for stream, weight in streams_and_weights:
+ p.insert_stream(stream_id=stream, weight=weight)
+ weights.append(weight)
+
+ period = sum(weights)
+
+ # Pop off the first n elements, which will always be evenly
+ # distributed.
+ for _ in weights:
+ next(p)
+
+ count = collections.Counter(next(p) for _ in range(period))
+
+ assert len(count) == len(streams_and_weights)
+ for stream, weight in streams_and_weights:
+ count[stream] == weight