summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas Kriechbaumer <thomas@kriechbaumer.name>2020-08-01 12:44:05 +0200
committerThomas Kriechbaumer <Kriechi@users.noreply.github.com>2020-08-02 13:34:58 +0200
commite3ab38a6440a3544431c92a9fb3e3e1307ef3f8c (patch)
treed0444c19825830ee6a05c08ad39e215f58f1c4f2 /src
parent32f868d4a0b5e97243f86c3f90edf4b2932a8fb5 (diff)
move all code under src/
Diffstat (limited to 'src')
-rw-r--r--src/h2/__init__.py8
-rw-r--r--src/h2/config.py170
-rw-r--r--src/h2/connection.py2051
-rw-r--r--src/h2/errors.py75
-rw-r--r--src/h2/events.py648
-rw-r--r--src/h2/exceptions.py186
-rw-r--r--src/h2/frame_buffer.py175
-rw-r--r--src/h2/settings.py339
-rw-r--r--src/h2/stream.py1371
-rw-r--r--src/h2/utilities.py660
-rw-r--r--src/h2/windows.py139
11 files changed, 5822 insertions, 0 deletions
diff --git a/src/h2/__init__.py b/src/h2/__init__.py
new file mode 100644
index 0000000..17ee91b
--- /dev/null
+++ b/src/h2/__init__.py
@@ -0,0 +1,8 @@
+# -*- coding: utf-8 -*-
+"""
+hyper-h2
+~~
+
+A HTTP/2 implementation.
+"""
+__version__ = '4.0.0dev0'
diff --git a/src/h2/config.py b/src/h2/config.py
new file mode 100644
index 0000000..1c437ee
--- /dev/null
+++ b/src/h2/config.py
@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+"""
+h2/config
+~~~~~~~~~
+
+Objects for controlling the configuration of the HTTP/2 stack.
+"""
+
+
+class _BooleanConfigOption(object):
+ """
+ Descriptor for handling a boolean config option. This will block
+ attempts to set boolean config options to non-bools.
+ """
+ def __init__(self, name):
+ self.name = name
+ self.attr_name = '_%s' % self.name
+
+ def __get__(self, instance, owner):
+ return getattr(instance, self.attr_name)
+
+ def __set__(self, instance, value):
+ if not isinstance(value, bool):
+ raise ValueError("%s must be a bool" % self.name)
+ setattr(instance, self.attr_name, value)
+
+
+class DummyLogger(object):
+ """
+ An Logger object that does not actual logging, hence a DummyLogger.
+
+ For the class the log operation is merely a no-op. The intent is to avoid
+ conditionals being sprinkled throughout the hyper-h2 code for calls to
+ logging functions when no logger is passed into the corresponding object.
+ """
+ def __init__(self, *vargs):
+ pass
+
+ def debug(self, *vargs, **kwargs):
+ """
+ No-op logging. Only level needed for now.
+ """
+ pass
+
+ def trace(self, *vargs, **kwargs):
+ """
+ No-op logging. Only level needed for now.
+ """
+ pass
+
+
+class H2Configuration(object):
+ """
+ An object that controls the way a single HTTP/2 connection behaves.
+
+ This object allows the users to customize behaviour. In particular, it
+ allows users to enable or disable optional features, or to otherwise handle
+ various unusual behaviours.
+
+ This object has very little behaviour of its own: it mostly just ensures
+ that configuration is self-consistent.
+
+ :param client_side: Whether this object is to be used on the client side of
+ a connection, or on the server side. Affects the logic used by the
+ state machine, the default settings values, the allowable stream IDs,
+ and several other properties. Defaults to ``True``.
+ :type client_side: ``bool``
+
+ :param header_encoding: Controls whether the headers emitted by this object
+ in events are transparently decoded to ``unicode`` strings, and what
+ encoding is used to do that decoding. This defaults to ``None``,
+ meaning that headers will be returned as bytes. To automatically
+ decode headers (that is, to return them as unicode strings), this can
+ be set to the string name of any encoding, e.g. ``'utf-8'``.
+
+ .. versionchanged:: 3.0.0
+ Changed default value from ``'utf-8'`` to ``None``
+
+ :type header_encoding: ``str``, ``False``, or ``None``
+
+ :param validate_outbound_headers: Controls whether the headers emitted
+ by this object are validated against the rules in RFC 7540.
+ Disabling this setting will cause outbound header validation to
+ be skipped, and allow the object to emit headers that may be illegal
+ according to RFC 7540. Defaults to ``True``.
+ :type validate_outbound_headers: ``bool``
+
+ :param normalize_outbound_headers: Controls whether the headers emitted
+ by this object are normalized before sending. Disabling this setting
+ will cause outbound header normalization to be skipped, and allow
+ the object to emit headers that may be illegal according to
+ RFC 7540. Defaults to ``True``.
+ :type normalize_outbound_headers: ``bool``
+
+ :param validate_inbound_headers: Controls whether the headers received
+ by this object are validated against the rules in RFC 7540.
+ Disabling this setting will cause inbound header validation to
+ be skipped, and allow the object to receive headers that may be illegal
+ according to RFC 7540. Defaults to ``True``.
+ :type validate_inbound_headers: ``bool``
+
+ :param normalize_inbound_headers: Controls whether the headers received by
+ this object are normalized according to the rules of RFC 7540.
+ Disabling this setting may lead to hyper-h2 emitting header blocks that
+ some RFCs forbid, e.g. with multiple cookie fields.
+
+ .. versionadded:: 3.0.0
+
+ :type normalize_inbound_headers: ``bool``
+
+ :param logger: A logger that conforms to the requirements for this module,
+ those being no I/O and no context switches, which is needed in order
+ to run in asynchronous operation.
+
+ .. versionadded:: 2.6.0
+
+ :type logger: ``logging.Logger``
+ """
+ client_side = _BooleanConfigOption('client_side')
+ validate_outbound_headers = _BooleanConfigOption(
+ 'validate_outbound_headers'
+ )
+ normalize_outbound_headers = _BooleanConfigOption(
+ 'normalize_outbound_headers'
+ )
+ validate_inbound_headers = _BooleanConfigOption(
+ 'validate_inbound_headers'
+ )
+ normalize_inbound_headers = _BooleanConfigOption(
+ 'normalize_inbound_headers'
+ )
+
+ def __init__(self,
+ client_side=True,
+ header_encoding=None,
+ validate_outbound_headers=True,
+ normalize_outbound_headers=True,
+ validate_inbound_headers=True,
+ normalize_inbound_headers=True,
+ logger=None):
+ self.client_side = client_side
+ self.header_encoding = header_encoding
+ self.validate_outbound_headers = validate_outbound_headers
+ self.normalize_outbound_headers = normalize_outbound_headers
+ self.validate_inbound_headers = validate_inbound_headers
+ self.normalize_inbound_headers = normalize_inbound_headers
+ self.logger = logger or DummyLogger(__name__)
+
+ @property
+ def header_encoding(self):
+ """
+ Controls whether the headers emitted by this object in events are
+ transparently decoded to ``unicode`` strings, and what encoding is used
+ to do that decoding. This defaults to ``None``, meaning that headers
+ will be returned as bytes. To automatically decode headers (that is, to
+ return them as unicode strings), this can be set to the string name of
+ any encoding, e.g. ``'utf-8'``.
+ """
+ return self._header_encoding
+
+ @header_encoding.setter
+ def header_encoding(self, value):
+ """
+ Enforces constraints on the value of header encoding.
+ """
+ if not isinstance(value, (bool, str, type(None))):
+ raise ValueError("header_encoding must be bool, string, or None")
+ if value is True:
+ raise ValueError("header_encoding cannot be True")
+ self._header_encoding = value
diff --git a/src/h2/connection.py b/src/h2/connection.py
new file mode 100644
index 0000000..881304b
--- /dev/null
+++ b/src/h2/connection.py
@@ -0,0 +1,2051 @@
+# -*- coding: utf-8 -*-
+"""
+h2/connection
+~~~~~~~~~~~~~
+
+An implementation of a HTTP/2 connection.
+"""
+import base64
+
+from enum import Enum, IntEnum
+
+from hyperframe.exceptions import InvalidPaddingError
+from hyperframe.frame import (
+ GoAwayFrame, WindowUpdateFrame, HeadersFrame, DataFrame, PingFrame,
+ PushPromiseFrame, SettingsFrame, RstStreamFrame, PriorityFrame,
+ ContinuationFrame, AltSvcFrame, ExtensionFrame
+)
+from hpack.hpack import Encoder, Decoder
+from hpack.exceptions import HPACKError, OversizedHeaderListError
+
+from .config import H2Configuration
+from .errors import ErrorCodes, _error_code_from_int
+from .events import (
+ WindowUpdated, RemoteSettingsChanged, PingReceived, PingAckReceived,
+ SettingsAcknowledged, ConnectionTerminated, PriorityUpdated,
+ AlternativeServiceAvailable, UnknownFrameReceived
+)
+from .exceptions import (
+ ProtocolError, NoSuchStreamError, FlowControlError, FrameTooLargeError,
+ TooManyStreamsError, StreamClosedError, StreamIDTooLowError,
+ NoAvailableStreamIDError, RFC1122Error, DenialOfServiceError
+)
+from .frame_buffer import FrameBuffer
+from .settings import Settings, SettingCodes
+from .stream import H2Stream, StreamClosedBy
+from .utilities import SizeLimitDict, guard_increment_window
+from .windows import WindowManager
+
+
+class ConnectionState(Enum):
+ IDLE = 0
+ CLIENT_OPEN = 1
+ SERVER_OPEN = 2
+ CLOSED = 3
+
+
+class ConnectionInputs(Enum):
+ SEND_HEADERS = 0
+ SEND_PUSH_PROMISE = 1
+ SEND_DATA = 2
+ SEND_GOAWAY = 3
+ SEND_WINDOW_UPDATE = 4
+ SEND_PING = 5
+ SEND_SETTINGS = 6
+ SEND_RST_STREAM = 7
+ SEND_PRIORITY = 8
+ RECV_HEADERS = 9
+ RECV_PUSH_PROMISE = 10
+ RECV_DATA = 11
+ RECV_GOAWAY = 12
+ RECV_WINDOW_UPDATE = 13
+ RECV_PING = 14
+ RECV_SETTINGS = 15
+ RECV_RST_STREAM = 16
+ RECV_PRIORITY = 17
+ SEND_ALTERNATIVE_SERVICE = 18 # Added in 2.3.0
+ RECV_ALTERNATIVE_SERVICE = 19 # Added in 2.3.0
+
+
+class AllowedStreamIDs(IntEnum):
+ EVEN = 0
+ ODD = 1
+
+
+class H2ConnectionStateMachine(object):
+ """
+ A single HTTP/2 connection state machine.
+
+ This state machine, while defined in its own class, is logically part of
+ the H2Connection class also defined in this file. The state machine itself
+ maintains very little state directly, instead focusing entirely on managing
+ state transitions.
+ """
+ # For the purposes of this state machine we treat HEADERS and their
+ # associated CONTINUATION frames as a single jumbo frame. The protocol
+ # allows/requires this by preventing other frames from being interleved in
+ # between HEADERS/CONTINUATION frames.
+ #
+ # The _transitions dictionary contains a mapping of tuples of
+ # (state, input) to tuples of (side_effect_function, end_state). This map
+ # contains all allowed transitions: anything not in this map is invalid
+ # and immediately causes a transition to ``closed``.
+
+ _transitions = {
+ # State: idle
+ (ConnectionState.IDLE, ConnectionInputs.SEND_HEADERS):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.IDLE, ConnectionInputs.RECV_HEADERS):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.IDLE, ConnectionInputs.SEND_SETTINGS):
+ (None, ConnectionState.IDLE),
+ (ConnectionState.IDLE, ConnectionInputs.RECV_SETTINGS):
+ (None, ConnectionState.IDLE),
+ (ConnectionState.IDLE, ConnectionInputs.SEND_WINDOW_UPDATE):
+ (None, ConnectionState.IDLE),
+ (ConnectionState.IDLE, ConnectionInputs.RECV_WINDOW_UPDATE):
+ (None, ConnectionState.IDLE),
+ (ConnectionState.IDLE, ConnectionInputs.SEND_PING):
+ (None, ConnectionState.IDLE),
+ (ConnectionState.IDLE, ConnectionInputs.RECV_PING):
+ (None, ConnectionState.IDLE),
+ (ConnectionState.IDLE, ConnectionInputs.SEND_GOAWAY):
+ (None, ConnectionState.CLOSED),
+ (ConnectionState.IDLE, ConnectionInputs.RECV_GOAWAY):
+ (None, ConnectionState.CLOSED),
+ (ConnectionState.IDLE, ConnectionInputs.SEND_PRIORITY):
+ (None, ConnectionState.IDLE),
+ (ConnectionState.IDLE, ConnectionInputs.RECV_PRIORITY):
+ (None, ConnectionState.IDLE),
+ (ConnectionState.IDLE, ConnectionInputs.SEND_ALTERNATIVE_SERVICE):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.IDLE, ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, ConnectionState.CLIENT_OPEN),
+
+ # State: open, client side.
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_HEADERS):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_DATA):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_GOAWAY):
+ (None, ConnectionState.CLOSED),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PING):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_SETTINGS):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PRIORITY):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_HEADERS):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PUSH_PROMISE):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_DATA):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_GOAWAY):
+ (None, ConnectionState.CLOSED),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PING):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_SETTINGS):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_RST_STREAM):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_RST_STREAM):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PRIORITY):
+ (None, ConnectionState.CLIENT_OPEN),
+ (ConnectionState.CLIENT_OPEN,
+ ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, ConnectionState.CLIENT_OPEN),
+
+ # State: open, server side.
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_HEADERS):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PUSH_PROMISE):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_DATA):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_GOAWAY):
+ (None, ConnectionState.CLOSED),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PING):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_SETTINGS):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PRIORITY):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_HEADERS):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_DATA):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_GOAWAY):
+ (None, ConnectionState.CLOSED),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PING):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_SETTINGS):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PRIORITY):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_RST_STREAM):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_RST_STREAM):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN,
+ ConnectionInputs.SEND_ALTERNATIVE_SERVICE):
+ (None, ConnectionState.SERVER_OPEN),
+ (ConnectionState.SERVER_OPEN,
+ ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, ConnectionState.SERVER_OPEN),
+
+ # State: closed
+ (ConnectionState.CLOSED, ConnectionInputs.SEND_GOAWAY):
+ (None, ConnectionState.CLOSED),
+ (ConnectionState.CLOSED, ConnectionInputs.RECV_GOAWAY):
+ (None, ConnectionState.CLOSED),
+ }
+
+ def __init__(self):
+ self.state = ConnectionState.IDLE
+
+ def process_input(self, input_):
+ """
+ Process a specific input in the state machine.
+ """
+ if not isinstance(input_, ConnectionInputs):
+ raise ValueError("Input must be an instance of ConnectionInputs")
+
+ try:
+ func, target_state = self._transitions[(self.state, input_)]
+ except KeyError:
+ old_state = self.state
+ self.state = ConnectionState.CLOSED
+ raise ProtocolError(
+ "Invalid input %s in state %s" % (input_, old_state)
+ )
+ else:
+ self.state = target_state
+ if func is not None: # pragma: no cover
+ return func()
+
+ return []
+
+
+class H2Connection(object):
+ """
+ A low-level HTTP/2 connection object. This handles building and receiving
+ frames and maintains both connection and per-stream state for all streams
+ on this connection.
+
+ This wraps a HTTP/2 Connection state machine implementation, ensuring that
+ frames can only be sent/received when the connection is in a valid state.
+ It also builds stream state machines on demand to ensure that the
+ constraints of those state machines are met as well. Attempts to create
+ frames that cannot be sent will raise a ``ProtocolError``.
+
+ .. versionchanged:: 2.3.0
+ Added the ``header_encoding`` keyword argument.
+
+ .. versionchanged:: 2.5.0
+ Added the ``config`` keyword argument. Deprecated the ``client_side``
+ and ``header_encoding`` parameters.
+
+ .. versionchanged:: 3.0.0
+ Removed deprecated parameters and properties.
+
+ :param config: The configuration for the HTTP/2 connection.
+
+ .. versionadded:: 2.5.0
+
+ :type config: :class:`H2Configuration <h2.config.H2Configuration>`
+ """
+ # The initial maximum outbound frame size. This can be changed by receiving
+ # a settings frame.
+ DEFAULT_MAX_OUTBOUND_FRAME_SIZE = 65535
+
+ # The initial maximum inbound frame size. This is somewhat arbitrarily
+ # chosen.
+ DEFAULT_MAX_INBOUND_FRAME_SIZE = 2**24
+
+ # The highest acceptable stream ID.
+ HIGHEST_ALLOWED_STREAM_ID = 2**31 - 1
+
+ # The largest acceptable window increment.
+ MAX_WINDOW_INCREMENT = 2**31 - 1
+
+ # The initial default value of SETTINGS_MAX_HEADER_LIST_SIZE.
+ DEFAULT_MAX_HEADER_LIST_SIZE = 2**16
+
+ # Keep in memory limited amount of results for streams closes
+ MAX_CLOSED_STREAMS = 2**16
+
+ def __init__(self, config=None):
+ self.state_machine = H2ConnectionStateMachine()
+ self.streams = {}
+ self.highest_inbound_stream_id = 0
+ self.highest_outbound_stream_id = 0
+ self.encoder = Encoder()
+ self.decoder = Decoder()
+
+ # This won't always actually do anything: for versions of HPACK older
+ # than 2.3.0 it does nothing. However, we have to try!
+ self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE
+
+ #: The configuration for this HTTP/2 connection object.
+ #:
+ #: .. versionadded:: 2.5.0
+ self.config = config
+ if self.config is None:
+ self.config = H2Configuration(
+ client_side=True,
+ )
+
+ # Objects that store settings, including defaults.
+ #
+ # We set the MAX_CONCURRENT_STREAMS value to 100 because its default is
+ # unbounded, and that's a dangerous default because it allows
+ # essentially unbounded resources to be allocated regardless of how
+ # they will be used. 100 should be suitable for the average
+ # application. This default obviously does not apply to the remote
+ # peer's settings: the remote peer controls them!
+ #
+ # We also set MAX_HEADER_LIST_SIZE to a reasonable value. This is to
+ # advertise our defence against CVE-2016-6581. However, not all
+ # versions of HPACK will let us do it. That's ok: we should at least
+ # suggest that we're not vulnerable.
+ self.local_settings = Settings(
+ client=self.config.client_side,
+ initial_values={
+ SettingCodes.MAX_CONCURRENT_STREAMS: 100,
+ SettingCodes.MAX_HEADER_LIST_SIZE:
+ self.DEFAULT_MAX_HEADER_LIST_SIZE,
+ }
+ )
+ self.remote_settings = Settings(client=not self.config.client_side)
+
+ # The current value of the connection flow control windows on the
+ # connection.
+ self.outbound_flow_control_window = (
+ self.remote_settings.initial_window_size
+ )
+
+ #: The maximum size of a frame that can be emitted by this peer, in
+ #: bytes.
+ self.max_outbound_frame_size = self.remote_settings.max_frame_size
+
+ #: The maximum size of a frame that can be received by this peer, in
+ #: bytes.
+ self.max_inbound_frame_size = self.local_settings.max_frame_size
+
+ # Buffer for incoming data.
+ self.incoming_buffer = FrameBuffer(server=not self.config.client_side)
+
+ # A private variable to store a sequence of received header frames
+ # until completion.
+ self._header_frames = []
+
+ # Data that needs to be sent.
+ self._data_to_send = bytearray()
+
+ # Keeps track of how streams are closed.
+ # Used to ensure that we don't blow up in the face of frames that were
+ # in flight when a RST_STREAM was sent.
+ # Also used to determine whether we should consider a frame received
+ # while a stream is closed as either a stream error or a connection
+ # error.
+ self._closed_streams = SizeLimitDict(
+ size_limit=self.MAX_CLOSED_STREAMS
+ )
+
+ # The flow control window manager for the connection.
+ self._inbound_flow_control_window_manager = WindowManager(
+ max_window_size=self.local_settings.initial_window_size
+ )
+
+ # When in doubt use dict-dispatch.
+ self._frame_dispatch_table = {
+ HeadersFrame: self._receive_headers_frame,
+ PushPromiseFrame: self._receive_push_promise_frame,
+ SettingsFrame: self._receive_settings_frame,
+ DataFrame: self._receive_data_frame,
+ WindowUpdateFrame: self._receive_window_update_frame,
+ PingFrame: self._receive_ping_frame,
+ RstStreamFrame: self._receive_rst_stream_frame,
+ PriorityFrame: self._receive_priority_frame,
+ GoAwayFrame: self._receive_goaway_frame,
+ ContinuationFrame: self._receive_naked_continuation,
+ AltSvcFrame: self._receive_alt_svc_frame,
+ ExtensionFrame: self._receive_unknown_frame
+ }
+
+ def _prepare_for_sending(self, frames):
+ if not frames:
+ return
+ self._data_to_send += b''.join(f.serialize() for f in frames)
+ assert all(f.body_len <= self.max_outbound_frame_size for f in frames)
+
+ def _open_streams(self, remainder):
+ """
+ A common method of counting number of open streams. Returns the number
+ of streams that are open *and* that have (stream ID % 2) == remainder.
+ While it iterates, also deletes any closed streams.
+ """
+ count = 0
+ to_delete = []
+
+ for stream_id, stream in self.streams.items():
+ if stream.open and (stream_id % 2 == remainder):
+ count += 1
+ elif stream.closed:
+ to_delete.append(stream_id)
+
+ for stream_id in to_delete:
+ stream = self.streams.pop(stream_id)
+ self._closed_streams[stream_id] = stream.closed_by
+
+ return count
+
+ @property
+ def open_outbound_streams(self):
+ """
+ The current number of open outbound streams.
+ """
+ outbound_numbers = int(self.config.client_side)
+ return self._open_streams(outbound_numbers)
+
+ @property
+ def open_inbound_streams(self):
+ """
+ The current number of open inbound streams.
+ """
+ inbound_numbers = int(not self.config.client_side)
+ return self._open_streams(inbound_numbers)
+
+ @property
+ def inbound_flow_control_window(self):
+ """
+ The size of the inbound flow control window for the connection. This is
+ rarely publicly useful: instead, use :meth:`remote_flow_control_window
+ <h2.connection.H2Connection.remote_flow_control_window>`. This
+ shortcut is largely present to provide a shortcut to this data.
+ """
+ return self._inbound_flow_control_window_manager.current_window_size
+
+ def _begin_new_stream(self, stream_id, allowed_ids):
+ """
+ Initiate a new stream.
+
+ .. versionchanged:: 2.0.0
+ Removed this function from the public API.
+
+ :param stream_id: The ID of the stream to open.
+ :param allowed_ids: What kind of stream ID is allowed.
+ """
+ self.config.logger.debug(
+ "Attempting to initiate stream ID %d", stream_id
+ )
+ outbound = self._stream_id_is_outbound(stream_id)
+ highest_stream_id = (
+ self.highest_outbound_stream_id if outbound else
+ self.highest_inbound_stream_id
+ )
+
+ if stream_id <= highest_stream_id:
+ raise StreamIDTooLowError(stream_id, highest_stream_id)
+
+ if (stream_id % 2) != int(allowed_ids):
+ raise ProtocolError(
+ "Invalid stream ID for peer."
+ )
+
+ s = H2Stream(
+ stream_id,
+ config=self.config,
+ inbound_window_size=self.local_settings.initial_window_size,
+ outbound_window_size=self.remote_settings.initial_window_size
+ )
+ self.config.logger.debug("Stream ID %d created", stream_id)
+ s.max_inbound_frame_size = self.max_inbound_frame_size
+ s.max_outbound_frame_size = self.max_outbound_frame_size
+
+ self.streams[stream_id] = s
+ self.config.logger.debug("Current streams: %s", self.streams.keys())
+
+ if outbound:
+ self.highest_outbound_stream_id = stream_id
+ else:
+ self.highest_inbound_stream_id = stream_id
+
+ return s
+
+ def initiate_connection(self):
+ """
+ Provides any data that needs to be sent at the start of the connection.
+ Must be called for both clients and servers.
+ """
+ self.config.logger.debug("Initializing connection")
+ self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
+ if self.config.client_side:
+ preamble = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
+ else:
+ preamble = b''
+
+ f = SettingsFrame(0)
+ for setting, value in self.local_settings.items():
+ f.settings[setting] = value
+ self.config.logger.debug(
+ "Send Settings frame: %s", self.local_settings
+ )
+
+ self._data_to_send += preamble + f.serialize()
+
+ def initiate_upgrade_connection(self, settings_header=None):
+ """
+ Call to initialise the connection object for use with an upgraded
+ HTTP/2 connection (i.e. a connection negotiated using the
+ ``Upgrade: h2c`` HTTP header).
+
+ This method differs from :meth:`initiate_connection
+ <h2.connection.H2Connection.initiate_connection>` in several ways.
+ Firstly, it handles the additional SETTINGS frame that is sent in the
+ ``HTTP2-Settings`` header field. When called on a client connection,
+ this method will return a bytestring that the caller can put in the
+ ``HTTP2-Settings`` field they send on their initial request. When
+ called on a server connection, the user **must** provide the value they
+ received from the client in the ``HTTP2-Settings`` header field to the
+ ``settings_header`` argument, which will be used appropriately.
+
+ Additionally, this method sets up stream 1 in a half-closed state
+ appropriate for this side of the connection, to reflect the fact that
+ the request is already complete.
+
+ Finally, this method also prepares the appropriate preamble to be sent
+ after the upgrade.
+
+ .. versionadded:: 2.3.0
+
+ :param settings_header: (optional, server-only): The value of the
+ ``HTTP2-Settings`` header field received from the client.
+ :type settings_header: ``bytes``
+
+ :returns: For clients, a bytestring to put in the ``HTTP2-Settings``.
+ For servers, returns nothing.
+ :rtype: ``bytes`` or ``None``
+ """
+ self.config.logger.debug(
+ "Upgrade connection. Current settings: %s", self.local_settings
+ )
+
+ frame_data = None
+ # Begin by getting the preamble in place.
+ self.initiate_connection()
+
+ if self.config.client_side:
+ f = SettingsFrame(0)
+ for setting, value in self.local_settings.items():
+ f.settings[setting] = value
+
+ frame_data = f.serialize_body()
+ frame_data = base64.urlsafe_b64encode(frame_data)
+ elif settings_header:
+ # We have a settings header from the client. This needs to be
+ # applied, but we want to throw away the ACK. We do this by
+ # inserting the data into a Settings frame and then passing it to
+ # the state machine, but ignoring the return value.
+ settings_header = base64.urlsafe_b64decode(settings_header)
+ f = SettingsFrame(0)
+ f.parse_body(settings_header)
+ self._receive_settings_frame(f)
+
+ # Set up appropriate state. Stream 1 in a half-closed state:
+ # half-closed(local) for clients, half-closed(remote) for servers.
+ # Additionally, we need to set up the Connection state machine.
+ connection_input = (
+ ConnectionInputs.SEND_HEADERS if self.config.client_side
+ else ConnectionInputs.RECV_HEADERS
+ )
+ self.config.logger.debug("Process input %s", connection_input)
+ self.state_machine.process_input(connection_input)
+
+ # Set up stream 1.
+ self._begin_new_stream(stream_id=1, allowed_ids=AllowedStreamIDs.ODD)
+ self.streams[1].upgrade(self.config.client_side)
+ return frame_data
+
+ def _get_or_create_stream(self, stream_id, allowed_ids):
+ """
+ Gets a stream by its stream ID. Will create one if one does not already
+ exist. Use allowed_ids to circumvent the usual stream ID rules for
+ clients and servers.
+
+ .. versionchanged:: 2.0.0
+ Removed this function from the public API.
+ """
+ try:
+ return self.streams[stream_id]
+ except KeyError:
+ return self._begin_new_stream(stream_id, allowed_ids)
+
+ def _get_stream_by_id(self, stream_id):
+ """
+ Gets a stream by its stream ID. Raises NoSuchStreamError if the stream
+ ID does not correspond to a known stream and is higher than the current
+ maximum: raises if it is lower than the current maximum.
+
+ .. versionchanged:: 2.0.0
+ Removed this function from the public API.
+ """
+ try:
+ return self.streams[stream_id]
+ except KeyError:
+ outbound = self._stream_id_is_outbound(stream_id)
+ highest_stream_id = (
+ self.highest_outbound_stream_id if outbound else
+ self.highest_inbound_stream_id
+ )
+
+ if stream_id > highest_stream_id:
+ raise NoSuchStreamError(stream_id)
+ else:
+ raise StreamClosedError(stream_id)
+
+ def get_next_available_stream_id(self):
+ """
+ Returns an integer suitable for use as the stream ID for the next
+ stream created by this endpoint. For server endpoints, this stream ID
+ will be even. For client endpoints, this stream ID will be odd. If no
+ stream IDs are available, raises :class:`NoAvailableStreamIDError
+ <h2.exceptions.NoAvailableStreamIDError>`.
+
+ .. warning:: The return value from this function does not change until
+ the stream ID has actually been used by sending or pushing
+ headers on that stream. For that reason, it should be
+ called as close as possible to the actual use of the
+ stream ID.
+
+ .. versionadded:: 2.0.0
+
+ :raises: :class:`NoAvailableStreamIDError
+ <h2.exceptions.NoAvailableStreamIDError>`
+ :returns: The next free stream ID this peer can use to initiate a
+ stream.
+ :rtype: ``int``
+ """
+ # No streams have been opened yet, so return the lowest allowed stream
+ # ID.
+ if not self.highest_outbound_stream_id:
+ next_stream_id = 1 if self.config.client_side else 2
+ else:
+ next_stream_id = self.highest_outbound_stream_id + 2
+ self.config.logger.debug(
+ "Next available stream ID %d", next_stream_id
+ )
+ if next_stream_id > self.HIGHEST_ALLOWED_STREAM_ID:
+ raise NoAvailableStreamIDError("Exhausted allowed stream IDs")
+
+ return next_stream_id
+
+ def send_headers(self, stream_id, headers, end_stream=False,
+ priority_weight=None, priority_depends_on=None,
+ priority_exclusive=None):
+ """
+ Send headers on a given stream.
+
+ This function can be used to send request or response headers: the kind
+ that are sent depends on whether this connection has been opened as a
+ client or server connection, and whether the stream was opened by the
+ remote peer or not.
+
+ If this is a client connection, calling ``send_headers`` will send the
+ headers as a request. It will also implicitly open the stream being
+ used. If this is a client connection and ``send_headers`` has *already*
+ been called, this will send trailers instead.
+
+ If this is a server connection, calling ``send_headers`` will send the
+ headers as a response. It is a protocol error for a server to open a
+ stream by sending headers. If this is a server connection and
+ ``send_headers`` has *already* been called, this will send trailers
+ instead.
+
+ When acting as a server, you may call ``send_headers`` any number of
+ times allowed by the following rules, in this order:
+
+ - zero or more times with ``(':status', '1XX')`` (where ``1XX`` is a
+ placeholder for any 100-level status code).
+ - once with any other status header.
+ - zero or one time for trailers.
+
+ That is, you are allowed to send as many informational responses as you
+ like, followed by one complete response and zero or one HTTP trailer
+ blocks.
+
+ Clients may send one or two header blocks: one request block, and
+ optionally one trailer block.
+
+ If it is important to send HPACK "never indexed" header fields (as
+ defined in `RFC 7451 Section 7.1.3
+ <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may
+ instead provide headers using the HPACK library's :class:`HeaderTuple
+ <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple
+ <hpack:hpack.NeverIndexedHeaderTuple>` objects.
+
+ This method also allows users to prioritize the stream immediately,
+ by sending priority information on the HEADERS frame directly. To do
+ this, any one of ``priority_weight``, ``priority_depends_on``, or
+ ``priority_exclusive`` must be set to a value that is not ``None``. For
+ more information on the priority fields, see :meth:`prioritize
+ <h2.connection.H2Connection.prioritize>`.
+
+ .. warning:: In HTTP/2, it is mandatory that all the HTTP/2 special
+ headers (that is, ones whose header keys begin with ``:``) appear
+ at the start of the header block, before any normal headers.
+
+ .. versionchanged:: 2.3.0
+ Added support for using :class:`HeaderTuple
+ <hpack:hpack.HeaderTuple>` objects to store headers.
+
+ .. versionchanged:: 2.4.0
+ Added the ability to provide priority keyword arguments:
+ ``priority_weight``, ``priority_depends_on``, and
+ ``priority_exclusive``.
+
+ :param stream_id: The stream ID to send the headers on. If this stream
+ does not currently exist, it will be created.
+ :type stream_id: ``int``
+
+ :param headers: The request/response headers to send.
+ :type headers: An iterable of two tuples of bytestrings or
+ :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects.
+
+ :param end_stream: Whether this headers frame should end the stream
+ immediately (that is, whether no more data will be sent after this
+ frame). Defaults to ``False``.
+ :type end_stream: ``bool``
+
+ :param priority_weight: Sets the priority weight of the stream. See
+ :meth:`prioritize <h2.connection.H2Connection.prioritize>` for more
+ about how this field works. Defaults to ``None``, which means that
+ no priority information will be sent.
+ :type priority_weight: ``int`` or ``None``
+
+ :param priority_depends_on: Sets which stream this one depends on for
+ priority purposes. See :meth:`prioritize
+ <h2.connection.H2Connection.prioritize>` for more about how this
+ field works. Defaults to ``None``, which means that no priority
+ information will be sent.
+ :type priority_depends_on: ``int`` or ``None``
+
+ :param priority_exclusive: Sets whether this stream exclusively depends
+ on the stream given in ``priority_depends_on`` for priority
+ purposes. See :meth:`prioritize
+ <h2.connection.H2Connection.prioritize>` for more about how this
+ field workds. Defaults to ``None``, which means that no priority
+ information will be sent.
+ :type priority_depends_on: ``bool`` or ``None``
+
+ :returns: Nothing
+ """
+ self.config.logger.debug(
+ "Send headers on stream ID %d", stream_id
+ )
+
+ # Check we can open the stream.
+ if stream_id not in self.streams:
+ max_open_streams = self.remote_settings.max_concurrent_streams
+ if (self.open_outbound_streams + 1) > max_open_streams:
+ raise TooManyStreamsError(
+ "Max outbound streams is %d, %d open" %
+ (max_open_streams, self.open_outbound_streams)
+ )
+
+ self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
+ stream = self._get_or_create_stream(
+ stream_id, AllowedStreamIDs(self.config.client_side)
+ )
+ frames = stream.send_headers(
+ headers, self.encoder, end_stream
+ )
+
+ # We may need to send priority information.
+ priority_present = (
+ (priority_weight is not None) or
+ (priority_depends_on is not None) or
+ (priority_exclusive is not None)
+ )
+
+ if priority_present:
+ if not self.config.client_side:
+ raise RFC1122Error("Servers SHOULD NOT prioritize streams.")
+
+ headers_frame = frames[0]
+ headers_frame.flags.add('PRIORITY')
+ frames[0] = _add_frame_priority(
+ headers_frame,
+ priority_weight,
+ priority_depends_on,
+ priority_exclusive
+ )
+
+ self._prepare_for_sending(frames)
+
+ def send_data(self, stream_id, data, end_stream=False, pad_length=None):
+ """
+ Send data on a given stream.
+
+ This method does no breaking up of data: if the data is larger than the
+ value returned by :meth:`local_flow_control_window
+ <h2.connection.H2Connection.local_flow_control_window>` for this stream
+ then a :class:`FlowControlError <h2.exceptions.FlowControlError>` will
+ be raised. If the data is larger than :data:`max_outbound_frame_size
+ <h2.connection.H2Connection.max_outbound_frame_size>` then a
+ :class:`FrameTooLargeError <h2.exceptions.FrameTooLargeError>` will be
+ raised.
+
+ Hyper-h2 does this to avoid buffering the data internally. If the user
+ has more data to send than hyper-h2 will allow, consider breaking it up
+ and buffering it externally.
+
+ :param stream_id: The ID of the stream on which to send the data.
+ :type stream_id: ``int``
+ :param data: The data to send on the stream.
+ :type data: ``bytes``
+ :param end_stream: (optional) Whether this is the last data to be sent
+ on the stream. Defaults to ``False``.
+ :type end_stream: ``bool``
+ :param pad_length: (optional) Length of the padding to apply to the
+ data frame. Defaults to ``None`` for no use of padding. Note that
+ a value of ``0`` results in padding of length ``0``
+ (with the "padding" flag set on the frame).
+
+ .. versionadded:: 2.6.0
+
+ :type pad_length: ``int``
+ :returns: Nothing
+ """
+ self.config.logger.debug(
+ "Send data on stream ID %d with len %d", stream_id, len(data)
+ )
+ frame_size = len(data)
+ if pad_length is not None:
+ if not isinstance(pad_length, int):
+ raise TypeError("pad_length must be an int")
+ if pad_length < 0 or pad_length > 255:
+ raise ValueError("pad_length must be within range: [0, 255]")
+ # Account for padding bytes plus the 1-byte padding length field.
+ frame_size += pad_length + 1
+ self.config.logger.debug(
+ "Frame size on stream ID %d is %d", stream_id, frame_size
+ )
+
+ if frame_size > self.local_flow_control_window(stream_id):
+ raise FlowControlError(
+ "Cannot send %d bytes, flow control window is %d." %
+ (frame_size, self.local_flow_control_window(stream_id))
+ )
+ elif frame_size > self.max_outbound_frame_size:
+ raise FrameTooLargeError(
+ "Cannot send frame size %d, max frame size is %d" %
+ (frame_size, self.max_outbound_frame_size)
+ )
+
+ self.state_machine.process_input(ConnectionInputs.SEND_DATA)
+ frames = self.streams[stream_id].send_data(
+ data, end_stream, pad_length=pad_length
+ )
+
+ self._prepare_for_sending(frames)
+
+ self.outbound_flow_control_window -= frame_size
+ self.config.logger.debug(
+ "Outbound flow control window size is %d",
+ self.outbound_flow_control_window
+ )
+ assert self.outbound_flow_control_window >= 0
+
+ def end_stream(self, stream_id):
+ """
+ Cleanly end a given stream.
+
+ This method ends a stream by sending an empty DATA frame on that stream
+ with the ``END_STREAM`` flag set.
+
+ :param stream_id: The ID of the stream to end.
+ :type stream_id: ``int``
+ :returns: Nothing
+ """
+ self.config.logger.debug("End stream ID %d", stream_id)
+ self.state_machine.process_input(ConnectionInputs.SEND_DATA)
+ frames = self.streams[stream_id].end_stream()
+ self._prepare_for_sending(frames)
+
+ def increment_flow_control_window(self, increment, stream_id=None):
+ """
+ Increment a flow control window, optionally for a single stream. Allows
+ the remote peer to send more data.
+
+ .. versionchanged:: 2.0.0
+ Rejects attempts to increment the flow control window by out of
+ range values with a ``ValueError``.
+
+ :param increment: The amount to increment the flow control window by.
+ :type increment: ``int``
+ :param stream_id: (optional) The ID of the stream that should have its
+ flow control window opened. If not present or ``None``, the
+ connection flow control window will be opened instead.
+ :type stream_id: ``int`` or ``None``
+ :returns: Nothing
+ :raises: ``ValueError``
+ """
+ if not (1 <= increment <= self.MAX_WINDOW_INCREMENT):
+ raise ValueError(
+ "Flow control increment must be between 1 and %d" %
+ self.MAX_WINDOW_INCREMENT
+ )
+
+ self.state_machine.process_input(ConnectionInputs.SEND_WINDOW_UPDATE)
+
+ if stream_id is not None:
+ stream = self.streams[stream_id]
+ frames = stream.increase_flow_control_window(
+ increment
+ )
+
+ self.config.logger.debug(
+ "Increase stream ID %d flow control window by %d",
+ stream_id, increment
+ )
+ else:
+ self._inbound_flow_control_window_manager.window_opened(increment)
+ f = WindowUpdateFrame(0)
+ f.window_increment = increment
+ frames = [f]
+
+ self.config.logger.debug(
+ "Increase connection flow control window by %d", increment
+ )
+
+ self._prepare_for_sending(frames)
+
+ def push_stream(self, stream_id, promised_stream_id, request_headers):
+ """
+ Push a response to the client by sending a PUSH_PROMISE frame.
+
+ If it is important to send HPACK "never indexed" header fields (as
+ defined in `RFC 7451 Section 7.1.3
+ <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may
+ instead provide headers using the HPACK library's :class:`HeaderTuple
+ <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple
+ <hpack:hpack.NeverIndexedHeaderTuple>` objects.
+
+ :param stream_id: The ID of the stream that this push is a response to.
+ :type stream_id: ``int``
+ :param promised_stream_id: The ID of the stream that the pushed
+ response will be sent on.
+ :type promised_stream_id: ``int``
+ :param request_headers: The headers of the request that the pushed
+ response will be responding to.
+ :type request_headers: An iterable of two tuples of bytestrings or
+ :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects.
+ :returns: Nothing
+ """
+ self.config.logger.debug(
+ "Send Push Promise frame on stream ID %d", stream_id
+ )
+
+ if not self.remote_settings.enable_push:
+ raise ProtocolError("Remote peer has disabled stream push")
+
+ self.state_machine.process_input(ConnectionInputs.SEND_PUSH_PROMISE)
+ stream = self._get_stream_by_id(stream_id)
+
+ # We need to prevent users pushing streams in response to streams that
+ # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The
+ # easiest way to do that is to assert that the stream_id is not even:
+ # this shortcut works because only servers can push and the state
+ # machine will enforce this.
+ if (stream_id % 2) == 0:
+ raise ProtocolError("Cannot recursively push streams.")
+
+ new_stream = self._begin_new_stream(
+ promised_stream_id, AllowedStreamIDs.EVEN
+ )
+ self.streams[promised_stream_id] = new_stream
+
+ frames = stream.push_stream_in_band(
+ promised_stream_id, request_headers, self.encoder
+ )
+ new_frames = new_stream.locally_pushed()
+ self._prepare_for_sending(frames + new_frames)
+
+ def ping(self, opaque_data):
+ """
+ Send a PING frame.
+
+ :param opaque_data: A bytestring of length 8 that will be sent in the
+ PING frame.
+ :returns: Nothing
+ """
+ self.config.logger.debug("Send Ping frame")
+
+ if not isinstance(opaque_data, bytes) or len(opaque_data) != 8:
+ raise ValueError("Invalid value for ping data: %r" % opaque_data)
+
+ self.state_machine.process_input(ConnectionInputs.SEND_PING)
+ f = PingFrame(0)
+ f.opaque_data = opaque_data
+ self._prepare_for_sending([f])
+
+ def reset_stream(self, stream_id, error_code=0):
+ """
+ Reset a stream.
+
+ This method forcibly closes a stream by sending a RST_STREAM frame for
+ a given stream. This is not a graceful closure. To gracefully end a
+ stream, try the :meth:`end_stream
+ <h2.connection.H2Connection.end_stream>` method.
+
+ :param stream_id: The ID of the stream to reset.
+ :type stream_id: ``int``
+ :param error_code: (optional) The error code to use to reset the
+ stream. Defaults to :data:`ErrorCodes.NO_ERROR
+ <h2.errors.ErrorCodes.NO_ERROR>`.
+ :type error_code: ``int``
+ :returns: Nothing
+ """
+ self.config.logger.debug("Reset stream ID %d", stream_id)
+ self.state_machine.process_input(ConnectionInputs.SEND_RST_STREAM)
+ stream = self._get_stream_by_id(stream_id)
+ frames = stream.reset_stream(error_code)
+ self._prepare_for_sending(frames)
+
+ def close_connection(self, error_code=0, additional_data=None,
+ last_stream_id=None):
+
+ """
+ Close a connection, emitting a GOAWAY frame.
+
+ .. versionchanged:: 2.4.0
+ Added ``additional_data`` and ``last_stream_id`` arguments.
+
+ :param error_code: (optional) The error code to send in the GOAWAY
+ frame.
+ :param additional_data: (optional) Additional debug data indicating
+ a reason for closing the connection. Must be a bytestring.
+ :param last_stream_id: (optional) The last stream which was processed
+ by the sender. Defaults to ``highest_inbound_stream_id``.
+ :returns: Nothing
+ """
+ self.config.logger.debug("Close connection")
+ self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY)
+
+ # Additional_data must be bytes
+ if additional_data is not None:
+ assert isinstance(additional_data, bytes)
+
+ if last_stream_id is None:
+ last_stream_id = self.highest_inbound_stream_id
+
+ f = GoAwayFrame(
+ stream_id=0,
+ last_stream_id=last_stream_id,
+ error_code=error_code,
+ additional_data=(additional_data or b'')
+ )
+ self._prepare_for_sending([f])
+
+ def update_settings(self, new_settings):
+ """
+ Update the local settings. This will prepare and emit the appropriate
+ SETTINGS frame.
+
+ :param new_settings: A dictionary of {setting: new value}
+ """
+ self.config.logger.debug(
+ "Update connection settings to %s", new_settings
+ )
+ self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
+ self.local_settings.update(new_settings)
+ s = SettingsFrame(0)
+ s.settings = new_settings
+ self._prepare_for_sending([s])
+
+ def advertise_alternative_service(self,
+ field_value,
+ origin=None,
+ stream_id=None):
+ """
+ Notify a client about an available Alternative Service.
+
+ An Alternative Service is defined in `RFC 7838
+ <https://tools.ietf.org/html/rfc7838>`_. An Alternative Service
+ notification informs a client that a given origin is also available
+ elsewhere.
+
+ Alternative Services can be advertised in two ways. Firstly, they can
+ be advertised explicitly: that is, a server can say "origin X is also
+ available at Y". To advertise like this, set the ``origin`` argument
+ and not the ``stream_id`` argument. Alternatively, they can be
+ advertised implicitly: that is, a server can say "the origin you're
+ contacting on stream X is also available at Y". To advertise like this,
+ set the ``stream_id`` argument and not the ``origin`` argument.
+
+ The explicit method of advertising can be done as long as the
+ connection is active. The implicit method can only be done after the
+ client has sent the request headers and before the server has sent the
+ response headers: outside of those points, Hyper-h2 will forbid sending
+ the Alternative Service advertisement by raising a ProtocolError.
+
+ The ``field_value`` parameter is specified in RFC 7838. Hyper-h2 does
+ not validate or introspect this argument: the user is required to
+ ensure that it's well-formed. ``field_value`` corresponds to RFC 7838's
+ "Alternative Service Field Value".
+
+ .. note:: It is strongly preferred to use the explicit method of
+ advertising Alternative Services. The implicit method of
+ advertising Alternative Services has a number of subtleties
+ and can lead to inconsistencies between the server and
+ client. Hyper-h2 allows both mechanisms, but caution is
+ strongly advised.
+
+ .. versionadded:: 2.3.0
+
+ :param field_value: The RFC 7838 Alternative Service Field Value. This
+ argument is not introspected by Hyper-h2: the user is responsible
+ for ensuring that it is well-formed.
+ :type field_value: ``bytes``
+
+ :param origin: The origin/authority to which the Alternative Service
+ being advertised applies. Must not be provided at the same time as
+ ``stream_id``.
+ :type origin: ``bytes`` or ``None``
+
+ :param stream_id: The ID of the stream which was sent to the authority
+ for which this Alternative Service advertisement applies. Must not
+ be provided at the same time as ``origin``.
+ :type stream_id: ``int`` or ``None``
+
+ :returns: Nothing.
+ """
+ if not isinstance(field_value, bytes):
+ raise ValueError("Field must be bytestring.")
+
+ if origin is not None and stream_id is not None:
+ raise ValueError("Must not provide both origin and stream_id")
+
+ self.state_machine.process_input(
+ ConnectionInputs.SEND_ALTERNATIVE_SERVICE
+ )
+
+ if origin is not None:
+ # This ALTSVC is sent on stream zero.
+ f = AltSvcFrame(stream_id=0)
+ f.origin = origin
+ f.field = field_value
+ frames = [f]
+ else:
+ stream = self._get_stream_by_id(stream_id)
+ frames = stream.advertise_alternative_service(field_value)
+
+ self._prepare_for_sending(frames)
+
+ def prioritize(self, stream_id, weight=None, depends_on=None,
+ exclusive=None):
+ """
+ Notify a server about the priority of a stream.
+
+ Stream priorities are a form of guidance to a remote server: they
+ inform the server about how important a given response is, so that the
+ server may allocate its resources (e.g. bandwidth, CPU time, etc.)
+ accordingly. This exists to allow clients to ensure that the most
+ important data arrives earlier, while less important data does not
+ starve out the more important data.
+
+ Stream priorities are explained in depth in `RFC 7540 Section 5.3
+ <https://tools.ietf.org/html/rfc7540#section-5.3>`_.
+
+ This method updates the priority information of a single stream. It may
+ be called well before a stream is actively in use, or well after a
+ stream is closed.
+
+ .. warning:: RFC 7540 allows for servers to change the priority of
+ streams. However, hyper-h2 **does not** allow server
+ stacks to do this. This is because most clients do not
+ adequately know how to respond when provided conflicting
+ priority information, and relatively little utility is
+ provided by making that functionality available.
+
+ .. note:: hyper-h2 **does not** maintain any information about the
+ RFC 7540 priority tree. That means that hyper-h2 does not
+ prevent incautious users from creating invalid priority
+ trees, particularly by creating priority loops. While some
+ basic error checking is provided by hyper-h2, users are
+ strongly recommended to understand their prioritisation
+ strategies before using the priority tools here.
+
+ .. note:: Priority information is strictly advisory. Servers are
+ allowed to disregard it entirely. Avoid relying on the idea
+ that your priority signaling will definitely be obeyed.
+
+ .. versionadded:: 2.4.0
+
+ :param stream_id: The ID of the stream to prioritize.
+ :type stream_id: ``int``
+
+ :param weight: The weight to give the stream. Defaults to ``16``, the
+ default weight of any stream. May be any value between ``1`` and
+ ``256`` inclusive. The relative weight of a stream indicates what
+ proportion of available resources will be allocated to that
+ stream.
+ :type weight: ``int``
+
+ :param depends_on: The ID of the stream on which this stream depends.
+ This stream will only be progressed if it is impossible to
+ progress the parent stream (the one on which this one depends).
+ Passing the value ``0`` means that this stream does not depend on
+ any other. Defaults to ``0``.
+ :type depends_on: ``int``
+
+ :param exclusive: Whether this stream is an exclusive dependency of its
+ "parent" stream (i.e. the stream given by ``depends_on``). If a
+ stream is an exclusive dependency of another, that means that all
+ previously-set children of the parent are moved to become children
+ of the new exclusively-dependent stream. Defaults to ``False``.
+ :type exclusive: ``bool``
+ """
+ if not self.config.client_side:
+ raise RFC1122Error("Servers SHOULD NOT prioritize streams.")
+
+ self.state_machine.process_input(
+ ConnectionInputs.SEND_PRIORITY
+ )
+
+ frame = PriorityFrame(stream_id)
+ frame = _add_frame_priority(frame, weight, depends_on, exclusive)
+
+ self._prepare_for_sending([frame])
+
+ def local_flow_control_window(self, stream_id):
+ """
+ Returns the maximum amount of data that can be sent on stream
+ ``stream_id``.
+
+ This value will never be larger than the total data that can be sent on
+ the connection: even if the given stream allows more data, the
+ connection window provides a logical maximum to the amount of data that
+ can be sent.
+
+ The maximum data that can be sent in a single data frame on a stream
+ is either this value, or the maximum frame size, whichever is
+ *smaller*.
+
+ :param stream_id: The ID of the stream whose flow control window is
+ being queried.
+ :type stream_id: ``int``
+ :returns: The amount of data in bytes that can be sent on the stream
+ before the flow control window is exhausted.
+ :rtype: ``int``
+ """
+ stream = self._get_stream_by_id(stream_id)
+ return min(
+ self.outbound_flow_control_window,
+ stream.outbound_flow_control_window
+ )
+
+ def remote_flow_control_window(self, stream_id):
+ """
+ Returns the maximum amount of data the remote peer can send on stream
+ ``stream_id``.
+
+ This value will never be larger than the total data that can be sent on
+ the connection: even if the given stream allows more data, the
+ connection window provides a logical maximum to the amount of data that
+ can be sent.
+
+ The maximum data that can be sent in a single data frame on a stream
+ is either this value, or the maximum frame size, whichever is
+ *smaller*.
+
+ :param stream_id: The ID of the stream whose flow control window is
+ being queried.
+ :type stream_id: ``int``
+ :returns: The amount of data in bytes that can be received on the
+ stream before the flow control window is exhausted.
+ :rtype: ``int``
+ """
+ stream = self._get_stream_by_id(stream_id)
+ return min(
+ self.inbound_flow_control_window,
+ stream.inbound_flow_control_window
+ )
+
+ def acknowledge_received_data(self, acknowledged_size, stream_id):
+ """
+ Inform the :class:`H2Connection <h2.connection.H2Connection>` that a
+ certain number of flow-controlled bytes have been processed, and that
+ the space should be handed back to the remote peer at an opportune
+ time.
+
+ .. versionadded:: 2.5.0
+
+ :param acknowledged_size: The total *flow-controlled size* of the data
+ that has been processed. Note that this must include the amount of
+ padding that was sent with that data.
+ :type acknowledged_size: ``int``
+ :param stream_id: The ID of the stream on which this data was received.
+ :type stream_id: ``int``
+ :returns: Nothing
+ :rtype: ``None``
+ """
+ self.config.logger.debug(
+ "Ack received data on stream ID %d with size %d",
+ stream_id, acknowledged_size
+ )
+ if stream_id <= 0:
+ raise ValueError(
+ "Stream ID %d is not valid for acknowledge_received_data" %
+ stream_id
+ )
+ if acknowledged_size < 0:
+ raise ValueError("Cannot acknowledge negative data")
+
+ frames = []
+
+ conn_manager = self._inbound_flow_control_window_manager
+ conn_increment = conn_manager.process_bytes(acknowledged_size)
+ if conn_increment:
+ f = WindowUpdateFrame(0)
+ f.window_increment = conn_increment
+ frames.append(f)
+
+ try:
+ stream = self._get_stream_by_id(stream_id)
+ except StreamClosedError:
+ # The stream is already gone. We're not worried about incrementing
+ # the window in this case.
+ pass
+ else:
+ # No point incrementing the windows of closed streams.
+ if stream.open:
+ frames.extend(
+ stream.acknowledge_received_data(acknowledged_size)
+ )
+
+ self._prepare_for_sending(frames)
+
+ def data_to_send(self, amount=None):
+ """
+ Returns some data for sending out of the internal data buffer.
+
+ This method is analogous to ``read`` on a file-like object, but it
+ doesn't block. Instead, it returns as much data as the user asks for,
+ or less if that much data is not available. It does not perform any
+ I/O, and so uses a different name.
+
+ :param amount: (optional) The maximum amount of data to return. If not
+ set, or set to ``None``, will return as much data as possible.
+ :type amount: ``int``
+ :returns: A bytestring containing the data to send on the wire.
+ :rtype: ``bytes``
+ """
+ if amount is None:
+ data = bytes(self._data_to_send)
+ self._data_to_send = bytearray()
+ return data
+ else:
+ data = bytes(self._data_to_send[:amount])
+ self._data_to_send = self._data_to_send[amount:]
+ return data
+
+ def clear_outbound_data_buffer(self):
+ """
+ Clears the outbound data buffer, such that if this call was immediately
+ followed by a call to
+ :meth:`data_to_send <h2.connection.H2Connection.data_to_send>`, that
+ call would return no data.
+
+ This method should not normally be used, but is made available to avoid
+ exposing implementation details.
+ """
+ self._data_to_send = bytearray()
+
+ def _acknowledge_settings(self):
+ """
+ Acknowledge settings that have been received.
+
+ .. versionchanged:: 2.0.0
+ Removed from public API, removed useless ``event`` parameter, made
+ automatic.
+
+ :returns: Nothing
+ """
+ self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
+
+ changes = self.remote_settings.acknowledge()
+
+ if SettingCodes.INITIAL_WINDOW_SIZE in changes:
+ setting = changes[SettingCodes.INITIAL_WINDOW_SIZE]
+ self._flow_control_change_from_settings(
+ setting.original_value,
+ setting.new_value,
+ )
+
+ # HEADER_TABLE_SIZE changes by the remote part affect our encoder: cf.
+ # RFC 7540 Section 6.5.2.
+ if SettingCodes.HEADER_TABLE_SIZE in changes:
+ setting = changes[SettingCodes.HEADER_TABLE_SIZE]
+ self.encoder.header_table_size = setting.new_value
+
+ if SettingCodes.MAX_FRAME_SIZE in changes:
+ setting = changes[SettingCodes.MAX_FRAME_SIZE]
+ self.max_outbound_frame_size = setting.new_value
+ for stream in self.streams.values():
+ stream.max_outbound_frame_size = setting.new_value
+
+ f = SettingsFrame(0)
+ f.flags.add('ACK')
+ return [f]
+
+ def _flow_control_change_from_settings(self, old_value, new_value):
+ """
+ Update flow control windows in response to a change in the value of
+ SETTINGS_INITIAL_WINDOW_SIZE.
+
+ When this setting is changed, it automatically updates all flow control
+ windows by the delta in the settings values. Note that it does not
+ increment the *connection* flow control window, per section 6.9.2 of
+ RFC 7540.
+ """
+ delta = new_value - old_value
+
+ for stream in self.streams.values():
+ stream.outbound_flow_control_window = guard_increment_window(
+ stream.outbound_flow_control_window,
+ delta
+ )
+
+ def _inbound_flow_control_change_from_settings(self, old_value, new_value):
+ """
+ Update remote flow control windows in response to a change in the value
+ of SETTINGS_INITIAL_WINDOW_SIZE.
+
+ When this setting is changed, it automatically updates all remote flow
+ control windows by the delta in the settings values.
+ """
+ delta = new_value - old_value
+
+ for stream in self.streams.values():
+ stream._inbound_flow_control_change_from_settings(delta)
+
+ def receive_data(self, data):
+ """
+ Pass some received HTTP/2 data to the connection for handling.
+
+ :param data: The data received from the remote peer on the network.
+ :type data: ``bytes``
+ :returns: A list of events that the remote peer triggered by sending
+ this data.
+ """
+ self.config.logger.trace(
+ "Process received data on connection. Received data: %r", data
+ )
+
+ events = []
+ self.incoming_buffer.add_data(data)
+ self.incoming_buffer.max_frame_size = self.max_inbound_frame_size
+
+ try:
+ for frame in self.incoming_buffer:
+ events.extend(self._receive_frame(frame))
+ except InvalidPaddingError:
+ self._terminate_connection(ErrorCodes.PROTOCOL_ERROR)
+ raise ProtocolError("Received frame with invalid padding.")
+ except ProtocolError as e:
+ # For whatever reason, receiving the frame caused a protocol error.
+ # We should prepare to emit a GoAway frame before throwing the
+ # exception up further. No need for an event: the exception will
+ # do fine.
+ self._terminate_connection(e.error_code)
+ raise
+
+ return events
+
+ def _receive_frame(self, frame):
+ """
+ Handle a frame received on the connection.
+
+ .. versionchanged:: 2.0.0
+ Removed from the public API.
+ """
+ try:
+ # I don't love using __class__ here, maybe reconsider it.
+ frames, events = self._frame_dispatch_table[frame.__class__](frame)
+ except StreamClosedError as e:
+ # If the stream was closed by RST_STREAM, we just send a RST_STREAM
+ # to the remote peer. Otherwise, this is a connection error, and so
+ # we will re-raise to trigger one.
+ if self._stream_is_closed_by_reset(e.stream_id):
+ f = RstStreamFrame(e.stream_id)
+ f.error_code = e.error_code
+ self._prepare_for_sending([f])
+ events = e._events
+ else:
+ raise
+ except StreamIDTooLowError as e:
+ # The stream ID seems invalid. This may happen when the closed
+ # stream has been cleaned up, or when the remote peer has opened a
+ # new stream with a higher stream ID than this one, forcing it
+ # closed implicitly.
+ #
+ # Check how the stream was closed: depending on the mechanism, it
+ # is either a stream error or a connection error.
+ if self._stream_is_closed_by_reset(e.stream_id):
+ # Closed by RST_STREAM is a stream error.
+ f = RstStreamFrame(e.stream_id)
+ f.error_code = ErrorCodes.STREAM_CLOSED
+ self._prepare_for_sending([f])
+ events = []
+ elif self._stream_is_closed_by_end(e.stream_id):
+ # Closed by END_STREAM is a connection error.
+ raise StreamClosedError(e.stream_id)
+ else:
+ # Closed implicitly, also a connection error, but of type
+ # PROTOCOL_ERROR.
+ raise
+ else:
+ self._prepare_for_sending(frames)
+
+ return events
+
+ def _terminate_connection(self, error_code):
+ """
+ Terminate the connection early. Used in error handling blocks to send
+ GOAWAY frames.
+ """
+ f = GoAwayFrame(0)
+ f.last_stream_id = self.highest_inbound_stream_id
+ f.error_code = error_code
+ self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY)
+ self._prepare_for_sending([f])
+
+ def _receive_headers_frame(self, frame):
+ """
+ Receive a headers frame on the connection.
+ """
+ # If necessary, check we can open the stream. Also validate that the
+ # stream ID is valid.
+ if frame.stream_id not in self.streams:
+ max_open_streams = self.local_settings.max_concurrent_streams
+ if (self.open_inbound_streams + 1) > max_open_streams:
+ raise TooManyStreamsError(
+ "Max outbound streams is %d, %d open" %
+ (max_open_streams, self.open_outbound_streams)
+ )
+
+ # Let's decode the headers. We handle headers as bytes internally up
+ # until we hang them off the event, at which point we may optionally
+ # convert them to unicode.
+ headers = _decode_headers(self.decoder, frame.data)
+
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_HEADERS
+ )
+ stream = self._get_or_create_stream(
+ frame.stream_id, AllowedStreamIDs(not self.config.client_side)
+ )
+ frames, stream_events = stream.receive_headers(
+ headers,
+ 'END_STREAM' in frame.flags,
+ self.config.header_encoding
+ )
+
+ if 'PRIORITY' in frame.flags:
+ p_frames, p_events = self._receive_priority_frame(frame)
+ stream_events[0].priority_updated = p_events[0]
+ stream_events.extend(p_events)
+ assert not p_frames
+
+ return frames, events + stream_events
+
+ def _receive_push_promise_frame(self, frame):
+ """
+ Receive a push-promise frame on the connection.
+ """
+ if not self.local_settings.enable_push:
+ raise ProtocolError("Received pushed stream")
+
+ pushed_headers = _decode_headers(self.decoder, frame.data)
+
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_PUSH_PROMISE
+ )
+
+ try:
+ stream = self._get_stream_by_id(frame.stream_id)
+ except NoSuchStreamError:
+ # We need to check if the parent stream was reset by us. If it was
+ # then we presume that the PUSH_PROMISE was in flight when we reset
+ # the parent stream. Rather than accept the new stream, just reset
+ # it.
+ #
+ # If this was closed naturally, however, we should call this a
+ # PROTOCOL_ERROR: pushing a stream on a naturally closed stream is
+ # a real problem because it creates a brand new stream that the
+ # remote peer now believes exists.
+ if (self._stream_closed_by(frame.stream_id) ==
+ StreamClosedBy.SEND_RST_STREAM):
+ f = RstStreamFrame(frame.promised_stream_id)
+ f.error_code = ErrorCodes.REFUSED_STREAM
+ return [f], events
+
+ raise ProtocolError("Attempted to push on closed stream.")
+
+ # We need to prevent peers pushing streams in response to streams that
+ # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The
+ # easiest way to do that is to assert that the stream_id is not even:
+ # this shortcut works because only servers can push and the state
+ # machine will enforce this.
+ if (frame.stream_id % 2) == 0:
+ raise ProtocolError("Cannot recursively push streams.")
+
+ try:
+ frames, stream_events = stream.receive_push_promise_in_band(
+ frame.promised_stream_id,
+ pushed_headers,
+ self.config.header_encoding,
+ )
+ except StreamClosedError:
+ # The parent stream was reset by us, so we presume that
+ # PUSH_PROMISE was in flight when we reset the parent stream.
+ # So we just reset the new stream.
+ f = RstStreamFrame(frame.promised_stream_id)
+ f.error_code = ErrorCodes.REFUSED_STREAM
+ return [f], events
+
+ new_stream = self._begin_new_stream(
+ frame.promised_stream_id, AllowedStreamIDs.EVEN
+ )
+ self.streams[frame.promised_stream_id] = new_stream
+ new_stream.remotely_pushed(pushed_headers)
+
+ return frames, events + stream_events
+
+ def _handle_data_on_closed_stream(self, events, exc, frame):
+ # This stream is already closed - and yet we received a DATA frame.
+ # The received DATA frame counts towards the connection flow window.
+ # We need to manually to acknowledge the DATA frame to update the flow
+ # window of the connection. Otherwise the whole connection stalls due
+ # the inbound flow window being 0.
+ frames = []
+ conn_manager = self._inbound_flow_control_window_manager
+ conn_increment = conn_manager.process_bytes(
+ frame.flow_controlled_length
+ )
+ if conn_increment:
+ f = WindowUpdateFrame(0)
+ f.window_increment = conn_increment
+ frames.append(f)
+ self.config.logger.debug(
+ "Received DATA frame on closed stream %d - "
+ "auto-emitted a WINDOW_UPDATE by %d",
+ frame.stream_id, conn_increment
+ )
+ f = RstStreamFrame(exc.stream_id)
+ f.error_code = exc.error_code
+ frames.append(f)
+ self.config.logger.debug(
+ "Stream %d already CLOSED or cleaned up - "
+ "auto-emitted a RST_FRAME" % frame.stream_id
+ )
+ return frames, events + exc._events
+
+ def _receive_data_frame(self, frame):
+ """
+ Receive a data frame on the connection.
+ """
+ flow_controlled_length = frame.flow_controlled_length
+
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_DATA
+ )
+ self._inbound_flow_control_window_manager.window_consumed(
+ flow_controlled_length
+ )
+
+ try:
+ stream = self._get_stream_by_id(frame.stream_id)
+ frames, stream_events = stream.receive_data(
+ frame.data,
+ 'END_STREAM' in frame.flags,
+ flow_controlled_length
+ )
+ except StreamClosedError as e:
+ # This stream is either marked as CLOSED or already gone from our
+ # internal state.
+ return self._handle_data_on_closed_stream(events, e, frame)
+
+ return frames, events + stream_events
+
+ def _receive_settings_frame(self, frame):
+ """
+ Receive a SETTINGS frame on the connection.
+ """
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_SETTINGS
+ )
+
+ # This is an ack of the local settings.
+ if 'ACK' in frame.flags:
+ changed_settings = self._local_settings_acked()
+ ack_event = SettingsAcknowledged()
+ ack_event.changed_settings = changed_settings
+ events.append(ack_event)
+ return [], events
+
+ # Add the new settings.
+ self.remote_settings.update(frame.settings)
+ events.append(
+ RemoteSettingsChanged.from_settings(
+ self.remote_settings, frame.settings
+ )
+ )
+ frames = self._acknowledge_settings()
+
+ return frames, events
+
+ def _receive_window_update_frame(self, frame):
+ """
+ Receive a WINDOW_UPDATE frame on the connection.
+ """
+ # Validate the frame.
+ if not (1 <= frame.window_increment <= self.MAX_WINDOW_INCREMENT):
+ raise ProtocolError(
+ "Flow control increment must be between 1 and %d, received %d"
+ % (self.MAX_WINDOW_INCREMENT, frame.window_increment)
+ )
+
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_WINDOW_UPDATE
+ )
+
+ if frame.stream_id:
+ try:
+ stream = self._get_stream_by_id(frame.stream_id)
+ frames, stream_events = stream.receive_window_update(
+ frame.window_increment
+ )
+ except StreamClosedError:
+ return [], events
+ else:
+ # Increment our local flow control window.
+ self.outbound_flow_control_window = guard_increment_window(
+ self.outbound_flow_control_window,
+ frame.window_increment
+ )
+
+ # FIXME: Should we split this into one event per active stream?
+ window_updated_event = WindowUpdated()
+ window_updated_event.stream_id = 0
+ window_updated_event.delta = frame.window_increment
+ stream_events = [window_updated_event]
+ frames = []
+
+ return frames, events + stream_events
+
+ def _receive_ping_frame(self, frame):
+ """
+ Receive a PING frame on the connection.
+ """
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_PING
+ )
+ flags = []
+
+ if 'ACK' in frame.flags:
+ evt = PingAckReceived()
+ else:
+ evt = PingReceived()
+
+ # automatically ACK the PING with the same 'opaque data'
+ f = PingFrame(0)
+ f.flags = {'ACK'}
+ f.opaque_data = frame.opaque_data
+ flags.append(f)
+
+ evt.ping_data = frame.opaque_data
+ events.append(evt)
+
+ return flags, events
+
+ def _receive_rst_stream_frame(self, frame):
+ """
+ Receive a RST_STREAM frame on the connection.
+ """
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_RST_STREAM
+ )
+ try:
+ stream = self._get_stream_by_id(frame.stream_id)
+ except NoSuchStreamError:
+ # The stream is missing. That's ok, we just do nothing here.
+ stream_frames = []
+ stream_events = []
+ else:
+ stream_frames, stream_events = stream.stream_reset(frame)
+
+ return stream_frames, events + stream_events
+
+ def _receive_priority_frame(self, frame):
+ """
+ Receive a PRIORITY frame on the connection.
+ """
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_PRIORITY
+ )
+
+ event = PriorityUpdated()
+ event.stream_id = frame.stream_id
+ event.depends_on = frame.depends_on
+ event.exclusive = frame.exclusive
+
+ # Weight is an integer between 1 and 256, but the byte only allows
+ # 0 to 255: add one.
+ event.weight = frame.stream_weight + 1
+
+ # A stream may not depend on itself.
+ if event.depends_on == frame.stream_id:
+ raise ProtocolError(
+ "Stream %d may not depend on itself" % frame.stream_id
+ )
+ events.append(event)
+
+ return [], events
+
+ def _receive_goaway_frame(self, frame):
+ """
+ Receive a GOAWAY frame on the connection.
+ """
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_GOAWAY
+ )
+
+ # Clear the outbound data buffer: we cannot send further data now.
+ self.clear_outbound_data_buffer()
+
+ # Fire an appropriate ConnectionTerminated event.
+ new_event = ConnectionTerminated()
+ new_event.error_code = _error_code_from_int(frame.error_code)
+ new_event.last_stream_id = frame.last_stream_id
+ new_event.additional_data = (frame.additional_data
+ if frame.additional_data else None)
+ events.append(new_event)
+
+ return [], events
+
+ def _receive_naked_continuation(self, frame):
+ """
+ A naked CONTINUATION frame has been received. This is always an error,
+ but the type of error it is depends on the state of the stream and must
+ transition the state of the stream, so we need to pass it to the
+ appropriate stream.
+ """
+ stream = self._get_stream_by_id(frame.stream_id)
+ stream.receive_continuation()
+ assert False, "Should not be reachable"
+
+ def _receive_alt_svc_frame(self, frame):
+ """
+ An ALTSVC frame has been received. This frame, specified in RFC 7838,
+ is used to advertise alternative places where the same service can be
+ reached.
+
+ This frame can optionally be received either on a stream or on stream
+ 0, and its semantics are different in each case.
+ """
+ events = self.state_machine.process_input(
+ ConnectionInputs.RECV_ALTERNATIVE_SERVICE
+ )
+ frames = []
+
+ if frame.stream_id:
+ # Given that it makes no sense to receive ALTSVC on a stream
+ # before that stream has been opened with a HEADERS frame, the
+ # ALTSVC frame cannot create a stream. If the stream is not
+ # present, we simply ignore the frame.
+ try:
+ stream = self._get_stream_by_id(frame.stream_id)
+ except (NoSuchStreamError, StreamClosedError):
+ pass
+ else:
+ stream_frames, stream_events = stream.receive_alt_svc(frame)
+ frames.extend(stream_frames)
+ events.extend(stream_events)
+ else:
+ # This frame is sent on stream 0. The origin field on the frame
+ # must be present, though if it isn't it's not a ProtocolError
+ # (annoyingly), we just need to ignore it.
+ if not frame.origin:
+ return frames, events
+
+ # If we're a server, we want to ignore this (RFC 7838 says so).
+ if not self.config.client_side:
+ return frames, events
+
+ event = AlternativeServiceAvailable()
+ event.origin = frame.origin
+ event.field_value = frame.field
+ events.append(event)
+
+ return frames, events
+
+ def _receive_unknown_frame(self, frame):
+ """
+ We have received a frame that we do not understand. This is almost
+ certainly an extension frame, though it's impossible to be entirely
+ sure.
+
+ RFC 7540 § 5.5 says that we MUST ignore unknown frame types: so we
+ do. We do notify the user that we received one, however.
+ """
+ # All we do here is log.
+ self.config.logger.debug(
+ "Received unknown extension frame (ID %d)", frame.stream_id
+ )
+ event = UnknownFrameReceived()
+ event.frame = frame
+ return [], [event]
+
+ def _local_settings_acked(self):
+ """
+ Handle the local settings being ACKed, update internal state.
+ """
+ changes = self.local_settings.acknowledge()
+
+ if SettingCodes.INITIAL_WINDOW_SIZE in changes:
+ setting = changes[SettingCodes.INITIAL_WINDOW_SIZE]
+ self._inbound_flow_control_change_from_settings(
+ setting.original_value,
+ setting.new_value,
+ )
+
+ if SettingCodes.MAX_HEADER_LIST_SIZE in changes:
+ setting = changes[SettingCodes.MAX_HEADER_LIST_SIZE]
+ self.decoder.max_header_list_size = setting.new_value
+
+ if SettingCodes.MAX_FRAME_SIZE in changes:
+ setting = changes[SettingCodes.MAX_FRAME_SIZE]
+ self.max_inbound_frame_size = setting.new_value
+
+ if SettingCodes.HEADER_TABLE_SIZE in changes:
+ setting = changes[SettingCodes.HEADER_TABLE_SIZE]
+ # This is safe across all hpack versions: some versions just won't
+ # respect it.
+ self.decoder.max_allowed_table_size = setting.new_value
+
+ return changes
+
+ def _stream_id_is_outbound(self, stream_id):
+ """
+ Returns ``True`` if the stream ID corresponds to an outbound stream
+ (one initiated by this peer), returns ``False`` otherwise.
+ """
+ return (stream_id % 2 == int(self.config.client_side))
+
+ def _stream_closed_by(self, stream_id):
+ """
+ Returns how the stream was closed.
+
+ The return value will be either a member of
+ ``h2.stream.StreamClosedBy`` or ``None``. If ``None``, the stream was
+ closed implicitly by the peer opening a stream with a higher stream ID
+ before opening this one.
+ """
+ if stream_id in self.streams:
+ return self.streams[stream_id].closed_by
+ if stream_id in self._closed_streams:
+ return self._closed_streams[stream_id]
+ return None
+
+ def _stream_is_closed_by_reset(self, stream_id):
+ """
+ Returns ``True`` if the stream was closed by sending or receiving a
+ RST_STREAM frame. Returns ``False`` otherwise.
+ """
+ return self._stream_closed_by(stream_id) in (
+ StreamClosedBy.RECV_RST_STREAM, StreamClosedBy.SEND_RST_STREAM
+ )
+
+ def _stream_is_closed_by_end(self, stream_id):
+ """
+ Returns ``True`` if the stream was closed by sending or receiving an
+ END_STREAM flag in a HEADERS or DATA frame. Returns ``False``
+ otherwise.
+ """
+ return self._stream_closed_by(stream_id) in (
+ StreamClosedBy.RECV_END_STREAM, StreamClosedBy.SEND_END_STREAM
+ )
+
+
+def _add_frame_priority(frame, weight=None, depends_on=None, exclusive=None):
+ """
+ Adds priority data to a given frame. Does not change any flags set on that
+ frame: if the caller is adding priority information to a HEADERS frame they
+ must set that themselves.
+
+ This method also deliberately sets defaults for anything missing.
+
+ This method validates the input values.
+ """
+ # A stream may not depend on itself.
+ if depends_on == frame.stream_id:
+ raise ProtocolError(
+ "Stream %d may not depend on itself" % frame.stream_id
+ )
+
+ # Weight must be between 1 and 256.
+ if weight is not None:
+ if weight > 256 or weight < 1:
+ raise ProtocolError(
+ "Weight must be between 1 and 256, not %d" % weight
+ )
+ else:
+ # Weight is an integer between 1 and 256, but the byte only allows
+ # 0 to 255: subtract one.
+ weight -= 1
+
+ # Set defaults for anything not provided.
+ weight = weight if weight is not None else 15
+ depends_on = depends_on if depends_on is not None else 0
+ exclusive = exclusive if exclusive is not None else False
+
+ frame.stream_weight = weight
+ frame.depends_on = depends_on
+ frame.exclusive = exclusive
+
+ return frame
+
+
+def _decode_headers(decoder, encoded_header_block):
+ """
+ Decode a HPACK-encoded header block, translating HPACK exceptions into
+ sensible hyper-h2 errors.
+
+ This only ever returns bytestring headers: hyper-h2 may emit them as
+ unicode later, but internally it processes them as bytestrings only.
+ """
+ try:
+ return decoder.decode(encoded_header_block, raw=True)
+ except OversizedHeaderListError as e:
+ # This is a symptom of a HPACK bomb attack: the user has
+ # disregarded our requirements on how large a header block we'll
+ # accept.
+ raise DenialOfServiceError("Oversized header block: %s" % e)
+ except (HPACKError, IndexError, TypeError, UnicodeDecodeError) as e:
+ # We should only need HPACKError here, but versions of HPACK older
+ # than 2.1.0 throw all three others as well. For maximum
+ # compatibility, catch all of them.
+ raise ProtocolError("Error decoding header block: %s" % e)
diff --git a/src/h2/errors.py b/src/h2/errors.py
new file mode 100644
index 0000000..303df59
--- /dev/null
+++ b/src/h2/errors.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+"""
+h2/errors
+~~~~~~~~~
+
+Global error code registry containing the established HTTP/2 error codes.
+
+The current registry is available at:
+https://tools.ietf.org/html/rfc7540#section-11.4
+"""
+import enum
+
+
+class ErrorCodes(enum.IntEnum):
+ """
+ All known HTTP/2 error codes.
+
+ .. versionadded:: 2.5.0
+ """
+ #: Graceful shutdown.
+ NO_ERROR = 0x0
+
+ #: Protocol error detected.
+ PROTOCOL_ERROR = 0x1
+
+ #: Implementation fault.
+ INTERNAL_ERROR = 0x2
+
+ #: Flow-control limits exceeded.
+ FLOW_CONTROL_ERROR = 0x3
+
+ #: Settings not acknowledged.
+ SETTINGS_TIMEOUT = 0x4
+
+ #: Frame received for closed stream.
+ STREAM_CLOSED = 0x5
+
+ #: Frame size incorrect.
+ FRAME_SIZE_ERROR = 0x6
+
+ #: Stream not processed.
+ REFUSED_STREAM = 0x7
+
+ #: Stream cancelled.
+ CANCEL = 0x8
+
+ #: Compression state not updated.
+ COMPRESSION_ERROR = 0x9
+
+ #: TCP connection error for CONNECT method.
+ CONNECT_ERROR = 0xa
+
+ #: Processing capacity exceeded.
+ ENHANCE_YOUR_CALM = 0xb
+
+ #: Negotiated TLS parameters not acceptable.
+ INADEQUATE_SECURITY = 0xc
+
+ #: Use HTTP/1.1 for the request.
+ HTTP_1_1_REQUIRED = 0xd
+
+
+def _error_code_from_int(code):
+ """
+ Given an integer error code, returns either one of :class:`ErrorCodes
+ <h2.errors.ErrorCodes>` or, if not present in the known set of codes,
+ returns the integer directly.
+ """
+ try:
+ return ErrorCodes(code)
+ except ValueError:
+ return code
+
+
+__all__ = ['ErrorCodes']
diff --git a/src/h2/events.py b/src/h2/events.py
new file mode 100644
index 0000000..a06c990
--- /dev/null
+++ b/src/h2/events.py
@@ -0,0 +1,648 @@
+# -*- coding: utf-8 -*-
+"""
+h2/events
+~~~~~~~~~
+
+Defines Event types for HTTP/2.
+
+Events are returned by the H2 state machine to allow implementations to keep
+track of events triggered by receiving data. Each time data is provided to the
+H2 state machine it processes the data and returns a list of Event objects.
+"""
+import binascii
+
+from .settings import ChangedSetting, _setting_code_from_int
+
+
+class Event(object):
+ """
+ Base class for h2 events.
+ """
+ pass
+
+
+class RequestReceived(Event):
+ """
+ The RequestReceived event is fired whenever request headers are received.
+ This event carries the HTTP headers for the given request and the stream ID
+ of the new stream.
+
+ .. versionchanged:: 2.3.0
+ Changed the type of ``headers`` to :class:`HeaderTuple
+ <hpack:hpack.HeaderTuple>`. This has no effect on current users.
+
+ .. versionchanged:: 2.4.0
+ Added ``stream_ended`` and ``priority_updated`` properties.
+ """
+ def __init__(self):
+ #: The Stream ID for the stream this request was made on.
+ self.stream_id = None
+
+ #: The request headers.
+ self.headers = None
+
+ #: If this request also ended the stream, the associated
+ #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available
+ #: here.
+ #:
+ #: .. versionadded:: 2.4.0
+ self.stream_ended = None
+
+ #: If this request also had associated priority information, the
+ #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>`
+ #: event will be available here.
+ #:
+ #: .. versionadded:: 2.4.0
+ self.priority_updated = None
+
+ def __repr__(self):
+ return "<RequestReceived stream_id:%s, headers:%s>" % (
+ self.stream_id, self.headers
+ )
+
+
+class ResponseReceived(Event):
+ """
+ The ResponseReceived event is fired whenever response headers are received.
+ This event carries the HTTP headers for the given response and the stream
+ ID of the new stream.
+
+ .. versionchanged:: 2.3.0
+ Changed the type of ``headers`` to :class:`HeaderTuple
+ <hpack:hpack.HeaderTuple>`. This has no effect on current users.
+
+ .. versionchanged:: 2.4.0
+ Added ``stream_ended`` and ``priority_updated`` properties.
+ """
+ def __init__(self):
+ #: The Stream ID for the stream this response was made on.
+ self.stream_id = None
+
+ #: The response headers.
+ self.headers = None
+
+ #: If this response also ended the stream, the associated
+ #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available
+ #: here.
+ #:
+ #: .. versionadded:: 2.4.0
+ self.stream_ended = None
+
+ #: If this response also had associated priority information, the
+ #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>`
+ #: event will be available here.
+ #:
+ #: .. versionadded:: 2.4.0
+ self.priority_updated = None
+
+ def __repr__(self):
+ return "<ResponseReceived stream_id:%s, headers:%s>" % (
+ self.stream_id, self.headers
+ )
+
+
+class TrailersReceived(Event):
+ """
+ The TrailersReceived event is fired whenever trailers are received on a
+ stream. Trailers are a set of headers sent after the body of the
+ request/response, and are used to provide information that wasn't known
+ ahead of time (e.g. content-length). This event carries the HTTP header
+ fields that form the trailers and the stream ID of the stream on which they
+ were received.
+
+ .. versionchanged:: 2.3.0
+ Changed the type of ``headers`` to :class:`HeaderTuple
+ <hpack:hpack.HeaderTuple>`. This has no effect on current users.
+
+ .. versionchanged:: 2.4.0
+ Added ``stream_ended`` and ``priority_updated`` properties.
+ """
+ def __init__(self):
+ #: The Stream ID for the stream on which these trailers were received.
+ self.stream_id = None
+
+ #: The trailers themselves.
+ self.headers = None
+
+ #: Trailers always end streams. This property has the associated
+ #: :class:`StreamEnded <h2.events.StreamEnded>` in it.
+ #:
+ #: .. versionadded:: 2.4.0
+ self.stream_ended = None
+
+ #: If the trailers also set associated priority information, the
+ #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>`
+ #: event will be available here.
+ #:
+ #: .. versionadded:: 2.4.0
+ self.priority_updated = None
+
+ def __repr__(self):
+ return "<TrailersReceived stream_id:%s, headers:%s>" % (
+ self.stream_id, self.headers
+ )
+
+
+class _HeadersSent(Event):
+ """
+ The _HeadersSent event is fired whenever headers are sent.
+
+ This is an internal event, used to determine validation steps on
+ outgoing header blocks.
+ """
+ pass
+
+
+class _ResponseSent(_HeadersSent):
+ """
+ The _ResponseSent event is fired whenever response headers are sent
+ on a stream.
+
+ This is an internal event, used to determine validation steps on
+ outgoing header blocks.
+ """
+ pass
+
+
+class _RequestSent(_HeadersSent):
+ """
+ The _RequestSent event is fired whenever request headers are sent
+ on a stream.
+
+ This is an internal event, used to determine validation steps on
+ outgoing header blocks.
+ """
+ pass
+
+
+class _TrailersSent(_HeadersSent):
+ """
+ The _TrailersSent event is fired whenever trailers are sent on a
+ stream. Trailers are a set of headers sent after the body of the
+ request/response, and are used to provide information that wasn't known
+ ahead of time (e.g. content-length).
+
+ This is an internal event, used to determine validation steps on
+ outgoing header blocks.
+ """
+ pass
+
+
+class _PushedRequestSent(_HeadersSent):
+ """
+ The _PushedRequestSent event is fired whenever pushed request headers are
+ sent.
+
+ This is an internal event, used to determine validation steps on outgoing
+ header blocks.
+ """
+ pass
+
+
+class InformationalResponseReceived(Event):
+ """
+ The InformationalResponseReceived event is fired when an informational
+ response (that is, one whose status code is a 1XX code) is received from
+ the remote peer.
+
+ The remote peer may send any number of these, from zero upwards. These
+ responses are most commonly sent in response to requests that have the
+ ``expect: 100-continue`` header field present. Most users can safely
+ ignore this event unless you are intending to use the
+ ``expect: 100-continue`` flow, or are for any reason expecting a different
+ 1XX status code.
+
+ .. versionadded:: 2.2.0
+
+ .. versionchanged:: 2.3.0
+ Changed the type of ``headers`` to :class:`HeaderTuple
+ <hpack:hpack.HeaderTuple>`. This has no effect on current users.
+
+ .. versionchanged:: 2.4.0
+ Added ``priority_updated`` property.
+ """
+ def __init__(self):
+ #: The Stream ID for the stream this informational response was made
+ #: on.
+ self.stream_id = None
+
+ #: The headers for this informational response.
+ self.headers = None
+
+ #: If this response also had associated priority information, the
+ #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>`
+ #: event will be available here.
+ #:
+ #: .. versionadded:: 2.4.0
+ self.priority_updated = None
+
+ def __repr__(self):
+ return "<InformationalResponseReceived stream_id:%s, headers:%s>" % (
+ self.stream_id, self.headers
+ )
+
+
+class DataReceived(Event):
+ """
+ The DataReceived event is fired whenever data is received on a stream from
+ the remote peer. The event carries the data itself, and the stream ID on
+ which the data was received.
+
+ .. versionchanged:: 2.4.0
+ Added ``stream_ended`` property.
+ """
+ def __init__(self):
+ #: The Stream ID for the stream this data was received on.
+ self.stream_id = None
+
+ #: The data itself.
+ self.data = None
+
+ #: The amount of data received that counts against the flow control
+ #: window. Note that padding counts against the flow control window, so
+ #: when adjusting flow control you should always use this field rather
+ #: than ``len(data)``.
+ self.flow_controlled_length = None
+
+ #: If this data chunk also completed the stream, the associated
+ #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available
+ #: here.
+ #:
+ #: .. versionadded:: 2.4.0
+ self.stream_ended = None
+
+ def __repr__(self):
+ return (
+ "<DataReceived stream_id:%s, "
+ "flow_controlled_length:%s, "
+ "data:%s>" % (
+ self.stream_id,
+ self.flow_controlled_length,
+ _bytes_representation(self.data[:20]),
+ )
+ )
+
+
+class WindowUpdated(Event):
+ """
+ The WindowUpdated event is fired whenever a flow control window changes
+ size. HTTP/2 defines flow control windows for connections and streams: this
+ event fires for both connections and streams. The event carries the ID of
+ the stream to which it applies (set to zero if the window update applies to
+ the connection), and the delta in the window size.
+ """
+ def __init__(self):
+ #: The Stream ID of the stream whose flow control window was changed.
+ #: May be ``0`` if the connection window was changed.
+ self.stream_id = None
+
+ #: The window delta.
+ self.delta = None
+
+ def __repr__(self):
+ return "<WindowUpdated stream_id:%s, delta:%s>" % (
+ self.stream_id, self.delta
+ )
+
+
+class RemoteSettingsChanged(Event):
+ """
+ The RemoteSettingsChanged event is fired whenever the remote peer changes
+ its settings. It contains a complete inventory of changed settings,
+ including their previous values.
+
+ In HTTP/2, settings changes need to be acknowledged. hyper-h2 automatically
+ acknowledges settings changes for efficiency. However, it is possible that
+ the caller may not be happy with the changed setting.
+
+ When this event is received, the caller should confirm that the new
+ settings are acceptable. If they are not acceptable, the user should close
+ the connection with the error code :data:`PROTOCOL_ERROR
+ <h2.errors.ErrorCodes.PROTOCOL_ERROR>`.
+
+ .. versionchanged:: 2.0.0
+ Prior to this version the user needed to acknowledge settings changes.
+ This is no longer the case: hyper-h2 now automatically acknowledges
+ them.
+ """
+ def __init__(self):
+ #: A dictionary of setting byte to
+ #: :class:`ChangedSetting <h2.settings.ChangedSetting>`, representing
+ #: the changed settings.
+ self.changed_settings = {}
+
+ @classmethod
+ def from_settings(cls, old_settings, new_settings):
+ """
+ Build a RemoteSettingsChanged event from a set of changed settings.
+
+ :param old_settings: A complete collection of old settings, in the form
+ of a dictionary of ``{setting: value}``.
+ :param new_settings: All the changed settings and their new values, in
+ the form of a dictionary of ``{setting: value}``.
+ """
+ e = cls()
+ for setting, new_value in new_settings.items():
+ setting = _setting_code_from_int(setting)
+ original_value = old_settings.get(setting)
+ change = ChangedSetting(setting, original_value, new_value)
+ e.changed_settings[setting] = change
+
+ return e
+
+ def __repr__(self):
+ return "<RemoteSettingsChanged changed_settings:{%s}>" % (
+ ", ".join(repr(cs) for cs in self.changed_settings.values()),
+ )
+
+
+class PingReceived(Event):
+ """
+ The PingReceived event is fired whenever a PING is received. It contains
+ the 'opaque data' of the PING frame. A ping acknowledgment with the same
+ 'opaque data' is automatically emitted after receiving a ping.
+
+ .. versionadded:: 3.1.0
+ """
+ def __init__(self):
+ #: The data included on the ping.
+ self.ping_data = None
+
+ def __repr__(self):
+ return "<PingReceived ping_data:%s>" % (
+ _bytes_representation(self.ping_data),
+ )
+
+
+class PingAcknowledged(Event):
+ """
+ Same as PingAckReceived.
+
+ .. deprecated:: 3.1.0
+ """
+ def __init__(self):
+ #: The data included on the ping.
+ self.ping_data = None
+
+ def __repr__(self):
+ return "<PingAckReceived ping_data:%s>" % (
+ _bytes_representation(self.ping_data),
+ )
+
+
+class PingAckReceived(PingAcknowledged):
+ """
+ The PingAckReceived event is fired whenever a PING acknowledgment is
+ received. It contains the 'opaque data' of the PING+ACK frame, allowing the
+ user to correlate PINGs and calculate RTT.
+
+ .. versionadded:: 3.1.0
+ """
+ pass
+
+
+class StreamEnded(Event):
+ """
+ The StreamEnded event is fired whenever a stream is ended by a remote
+ party. The stream may not be fully closed if it has not been closed
+ locally, but no further data or headers should be expected on that stream.
+ """
+ def __init__(self):
+ #: The Stream ID of the stream that was closed.
+ self.stream_id = None
+
+ def __repr__(self):
+ return "<StreamEnded stream_id:%s>" % self.stream_id
+
+
+class StreamReset(Event):
+ """
+ The StreamReset event is fired in two situations. The first is when the
+ remote party forcefully resets the stream. The second is when the remote
+ party has made a protocol error which only affects a single stream. In this
+ case, Hyper-h2 will terminate the stream early and return this event.
+
+ .. versionchanged:: 2.0.0
+ This event is now fired when Hyper-h2 automatically resets a stream.
+ """
+ def __init__(self):
+ #: The Stream ID of the stream that was reset.
+ self.stream_id = None
+
+ #: The error code given. Either one of :class:`ErrorCodes
+ #: <h2.errors.ErrorCodes>` or ``int``
+ self.error_code = None
+
+ #: Whether the remote peer sent a RST_STREAM or we did.
+ self.remote_reset = True
+
+ def __repr__(self):
+ return "<StreamReset stream_id:%s, error_code:%s, remote_reset:%s>" % (
+ self.stream_id, self.error_code, self.remote_reset
+ )
+
+
+class PushedStreamReceived(Event):
+ """
+ The PushedStreamReceived event is fired whenever a pushed stream has been
+ received from a remote peer. The event carries on it the new stream ID, the
+ ID of the parent stream, and the request headers pushed by the remote peer.
+ """
+ def __init__(self):
+ #: The Stream ID of the stream created by the push.
+ self.pushed_stream_id = None
+
+ #: The Stream ID of the stream that the push is related to.
+ self.parent_stream_id = None
+
+ #: The request headers, sent by the remote party in the push.
+ self.headers = None
+
+ def __repr__(self):
+ return (
+ "<PushedStreamReceived pushed_stream_id:%s, parent_stream_id:%s, "
+ "headers:%s>" % (
+ self.pushed_stream_id,
+ self.parent_stream_id,
+ self.headers,
+ )
+ )
+
+
+class SettingsAcknowledged(Event):
+ """
+ The SettingsAcknowledged event is fired whenever a settings ACK is received
+ from the remote peer. The event carries on it the settings that were
+ acknowedged, in the same format as
+ :class:`h2.events.RemoteSettingsChanged`.
+ """
+ def __init__(self):
+ #: A dictionary of setting byte to
+ #: :class:`ChangedSetting <h2.settings.ChangedSetting>`, representing
+ #: the changed settings.
+ self.changed_settings = {}
+
+ def __repr__(self):
+ return "<SettingsAcknowledged changed_settings:{%s}>" % (
+ ", ".join(repr(cs) for cs in self.changed_settings.values()),
+ )
+
+
+class PriorityUpdated(Event):
+ """
+ The PriorityUpdated event is fired whenever a stream sends updated priority
+ information. This can occur when the stream is opened, or at any time
+ during the stream lifetime.
+
+ This event is purely advisory, and does not need to be acted on.
+
+ .. versionadded:: 2.0.0
+ """
+ def __init__(self):
+ #: The ID of the stream whose priority information is being updated.
+ self.stream_id = None
+
+ #: The new stream weight. May be the same as the original stream
+ #: weight. An integer between 1 and 256.
+ self.weight = None
+
+ #: The stream ID this stream now depends on. May be ``0``.
+ self.depends_on = None
+
+ #: Whether the stream *exclusively* depends on the parent stream. If it
+ #: does, this stream should inherit the current children of its new
+ #: parent.
+ self.exclusive = None
+
+ def __repr__(self):
+ return (
+ "<PriorityUpdated stream_id:%s, weight:%s, depends_on:%s, "
+ "exclusive:%s>" % (
+ self.stream_id,
+ self.weight,
+ self.depends_on,
+ self.exclusive
+ )
+ )
+
+
+class ConnectionTerminated(Event):
+ """
+ The ConnectionTerminated event is fired when a connection is torn down by
+ the remote peer using a GOAWAY frame. Once received, no further action may
+ be taken on the connection: a new connection must be established.
+ """
+ def __init__(self):
+ #: The error code cited when tearing down the connection. Should be
+ #: one of :class:`ErrorCodes <h2.errors.ErrorCodes>`, but may not be if
+ #: unknown HTTP/2 extensions are being used.
+ self.error_code = None
+
+ #: The stream ID of the last stream the remote peer saw. This can
+ #: provide an indication of what data, if any, never reached the remote
+ #: peer and so can safely be resent.
+ self.last_stream_id = None
+
+ #: Additional debug data that can be appended to GOAWAY frame.
+ self.additional_data = None
+
+ def __repr__(self):
+ return (
+ "<ConnectionTerminated error_code:%s, last_stream_id:%s, "
+ "additional_data:%s>" % (
+ self.error_code,
+ self.last_stream_id,
+ _bytes_representation(
+ self.additional_data[:20]
+ if self.additional_data else None)
+ )
+ )
+
+
+class AlternativeServiceAvailable(Event):
+ """
+ The AlternativeServiceAvailable event is fired when the remote peer
+ advertises an `RFC 7838 <https://tools.ietf.org/html/rfc7838>`_ Alternative
+ Service using an ALTSVC frame.
+
+ This event always carries the origin to which the ALTSVC information
+ applies. That origin is either supplied by the server directly, or inferred
+ by hyper-h2 from the ``:authority`` pseudo-header field that was sent by
+ the user when initiating a given stream.
+
+ This event also carries what RFC 7838 calls the "Alternative Service Field
+ Value", which is formatted like a HTTP header field and contains the
+ relevant alternative service information. Hyper-h2 does not parse or in any
+ way modify that information: the user is required to do that.
+
+ This event can only be fired on the client end of a connection.
+
+ .. versionadded:: 2.3.0
+ """
+ def __init__(self):
+ #: The origin to which the alternative service field value applies.
+ #: This field is either supplied by the server directly, or inferred by
+ #: hyper-h2 from the ``:authority`` pseudo-header field that was sent
+ #: by the user when initiating the stream on which the frame was
+ #: received.
+ self.origin = None
+
+ #: The ALTSVC field value. This contains information about the HTTP
+ #: alternative service being advertised by the server. Hyper-h2 does
+ #: not parse this field: it is left exactly as sent by the server. The
+ #: structure of the data in this field is given by `RFC 7838 Section 3
+ #: <https://tools.ietf.org/html/rfc7838#section-3>`_.
+ self.field_value = None
+
+ def __repr__(self):
+ return (
+ "<AlternativeServiceAvailable origin:%s, field_value:%s>" % (
+ self.origin.decode('utf-8', 'ignore'),
+ self.field_value.decode('utf-8', 'ignore'),
+ )
+ )
+
+
+class UnknownFrameReceived(Event):
+ """
+ The UnknownFrameReceived event is fired when the remote peer sends a frame
+ that hyper-h2 does not understand. This occurs primarily when the remote
+ peer is employing HTTP/2 extensions that hyper-h2 doesn't know anything
+ about.
+
+ RFC 7540 requires that HTTP/2 implementations ignore these frames. hyper-h2
+ does so. However, this event is fired to allow implementations to perform
+ special processing on those frames if needed (e.g. if the implementation
+ is capable of handling the frame itself).
+
+ .. versionadded:: 2.7.0
+ """
+ def __init__(self):
+ #: The hyperframe Frame object that encapsulates the received frame.
+ self.frame = None
+
+ def __repr__(self):
+ return "<UnknownFrameReceived>"
+
+
+def _bytes_representation(data):
+ """
+ Converts a bytestring into something that is safe to print on all Python
+ platforms.
+
+ This function is relatively expensive, so it should not be called on the
+ mainline of the code. It's safe to use in things like object repr methods
+ though.
+ """
+ if data is None:
+ return None
+
+ hex = binascii.hexlify(data)
+
+ # This is moderately clever: on all Python versions hexlify returns a byte
+ # string. On Python 3 we want an actual string, so we just check whether
+ # that's what we have.
+ if not isinstance(hex, str): # pragma: no cover
+ hex = hex.decode('ascii')
+
+ return hex
diff --git a/src/h2/exceptions.py b/src/h2/exceptions.py
new file mode 100644
index 0000000..388f9e9
--- /dev/null
+++ b/src/h2/exceptions.py
@@ -0,0 +1,186 @@
+# -*- coding: utf-8 -*-
+"""
+h2/exceptions
+~~~~~~~~~~~~~
+
+Exceptions for the HTTP/2 module.
+"""
+import h2.errors
+
+
+class H2Error(Exception):
+ """
+ The base class for all exceptions for the HTTP/2 module.
+ """
+
+
+class ProtocolError(H2Error):
+ """
+ An action was attempted in violation of the HTTP/2 protocol.
+ """
+ #: The error code corresponds to this kind of Protocol Error.
+ error_code = h2.errors.ErrorCodes.PROTOCOL_ERROR
+
+
+class FrameTooLargeError(ProtocolError):
+ """
+ The frame that we tried to send or that we received was too large.
+ """
+ #: This error code that corresponds to this kind of Protocol Error.
+ error_code = h2.errors.ErrorCodes.FRAME_SIZE_ERROR
+
+
+class FrameDataMissingError(ProtocolError):
+ """
+ The frame that we received is missing some data.
+
+ .. versionadded:: 2.0.0
+ """
+ #: The error code that corresponds to this kind of Protocol Error
+ error_code = h2.errors.ErrorCodes.FRAME_SIZE_ERROR
+
+
+class TooManyStreamsError(ProtocolError):
+ """
+ An attempt was made to open a stream that would lead to too many concurrent
+ streams.
+ """
+ pass
+
+
+class FlowControlError(ProtocolError):
+ """
+ An attempted action violates flow control constraints.
+ """
+ #: The error code that corresponds to this kind of
+ #: :class:`ProtocolError <h2.exceptions.ProtocolError>`
+ error_code = h2.errors.ErrorCodes.FLOW_CONTROL_ERROR
+
+
+class StreamIDTooLowError(ProtocolError):
+ """
+ An attempt was made to open a stream that had an ID that is lower than the
+ highest ID we have seen on this connection.
+ """
+ def __init__(self, stream_id, max_stream_id):
+ #: The ID of the stream that we attempted to open.
+ self.stream_id = stream_id
+
+ #: The current highest-seen stream ID.
+ self.max_stream_id = max_stream_id
+
+ def __str__(self):
+ return "StreamIDTooLowError: %d is lower than %d" % (
+ self.stream_id, self.max_stream_id
+ )
+
+
+class NoAvailableStreamIDError(ProtocolError):
+ """
+ There are no available stream IDs left to the connection. All stream IDs
+ have been exhausted.
+
+ .. versionadded:: 2.0.0
+ """
+ pass
+
+
+class NoSuchStreamError(ProtocolError):
+ """
+ A stream-specific action referenced a stream that does not exist.
+
+ .. versionchanged:: 2.0.0
+ Became a subclass of :class:`ProtocolError
+ <h2.exceptions.ProtocolError>`
+ """
+ def __init__(self, stream_id):
+ #: The stream ID that corresponds to the non-existent stream.
+ self.stream_id = stream_id
+
+
+class StreamClosedError(NoSuchStreamError):
+ """
+ A more specific form of
+ :class:`NoSuchStreamError <h2.exceptions.NoSuchStreamError>`. Indicates
+ that the stream has since been closed, and that all state relating to that
+ stream has been removed.
+ """
+ def __init__(self, stream_id):
+ #: The stream ID that corresponds to the nonexistent stream.
+ self.stream_id = stream_id
+
+ #: The relevant HTTP/2 error code.
+ self.error_code = h2.errors.ErrorCodes.STREAM_CLOSED
+
+ # Any events that internal code may need to fire. Not relevant to
+ # external users that may receive a StreamClosedError.
+ self._events = []
+
+
+class InvalidSettingsValueError(ProtocolError, ValueError):
+ """
+ An attempt was made to set an invalid Settings value.
+
+ .. versionadded:: 2.0.0
+ """
+ def __init__(self, msg, error_code):
+ super(InvalidSettingsValueError, self).__init__(msg)
+ self.error_code = error_code
+
+
+class InvalidBodyLengthError(ProtocolError):
+ """
+ The remote peer sent more or less data that the Content-Length header
+ indicated.
+
+ .. versionadded:: 2.0.0
+ """
+ def __init__(self, expected, actual):
+ self.expected_length = expected
+ self.actual_length = actual
+
+ def __str__(self):
+ return "InvalidBodyLengthError: Expected %d bytes, received %d" % (
+ self.expected_length, self.actual_length
+ )
+
+
+class UnsupportedFrameError(ProtocolError, KeyError):
+ """
+ The remote peer sent a frame that is unsupported in this context.
+
+ .. versionadded:: 2.1.0
+ """
+ # TODO: Remove the KeyError in 3.0.0
+ pass
+
+
+class RFC1122Error(H2Error):
+ """
+ Emitted when users attempt to do something that is literally allowed by the
+ relevant RFC, but is sufficiently ill-defined that it's unwise to allow
+ users to actually do it.
+
+ While there is some disagreement about whether or not we should be liberal
+ in what accept, it is a truth universally acknowledged that we should be
+ conservative in what emit.
+
+ .. versionadded:: 2.4.0
+ """
+ # shazow says I'm going to regret naming the exception this way. If that
+ # turns out to be true, TELL HIM NOTHING.
+ pass
+
+
+class DenialOfServiceError(ProtocolError):
+ """
+ Emitted when the remote peer exhibits a behaviour that is likely to be an
+ attempt to perform a Denial of Service attack on the implementation. This
+ is a form of ProtocolError that carries a different error code, and allows
+ more easy detection of this kind of behaviour.
+
+ .. versionadded:: 2.5.0
+ """
+ #: The error code that corresponds to this kind of
+ #: :class:`ProtocolError <h2.exceptions.ProtocolError>`
+ error_code = h2.errors.ErrorCodes.ENHANCE_YOUR_CALM
diff --git a/src/h2/frame_buffer.py b/src/h2/frame_buffer.py
new file mode 100644
index 0000000..e79f6ec
--- /dev/null
+++ b/src/h2/frame_buffer.py
@@ -0,0 +1,175 @@
+# -*- coding: utf-8 -*-
+"""
+h2/frame_buffer
+~~~~~~~~~~~~~~~
+
+A data structure that provides a way to iterate over a byte buffer in terms of
+frames.
+"""
+from hyperframe.exceptions import InvalidFrameError
+from hyperframe.frame import (
+ Frame, HeadersFrame, ContinuationFrame, PushPromiseFrame
+)
+
+from .exceptions import (
+ ProtocolError, FrameTooLargeError, FrameDataMissingError
+)
+
+# To avoid a DOS attack based on sending loads of continuation frames, we limit
+# the maximum number we're perpared to receive. In this case, we'll set the
+# limit to 64, which means the largest encoded header block we can receive by
+# default is 262144 bytes long, and the largest possible *at all* is 1073741760
+# bytes long.
+#
+# This value seems reasonable for now, but in future we may want to evaluate
+# making it configurable.
+CONTINUATION_BACKLOG = 64
+
+
+class FrameBuffer(object):
+ """
+ This is a data structure that expects to act as a buffer for HTTP/2 data
+ that allows iteraton in terms of H2 frames.
+ """
+ def __init__(self, server=False):
+ self.data = b''
+ self.max_frame_size = 0
+ self._preamble = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n' if server else b''
+ self._preamble_len = len(self._preamble)
+ self._headers_buffer = []
+
+ def add_data(self, data):
+ """
+ Add more data to the frame buffer.
+
+ :param data: A bytestring containing the byte buffer.
+ """
+ if self._preamble_len:
+ data_len = len(data)
+ of_which_preamble = min(self._preamble_len, data_len)
+
+ if self._preamble[:of_which_preamble] != data[:of_which_preamble]:
+ raise ProtocolError("Invalid HTTP/2 preamble.")
+
+ data = data[of_which_preamble:]
+ self._preamble_len -= of_which_preamble
+ self._preamble = self._preamble[of_which_preamble:]
+
+ self.data += data
+
+ def _parse_frame_header(self, data):
+ """
+ Parses the frame header from the data. Either returns a tuple of
+ (frame, length), or throws an exception. The returned frame may be None
+ if the frame is of unknown type.
+ """
+ try:
+ frame, length = Frame.parse_frame_header(data[:9])
+ except ValueError as e:
+ # The frame header is invalid. This is a ProtocolError
+ raise ProtocolError("Invalid frame header received: %s" % str(e))
+
+ return frame, length
+
+ def _validate_frame_length(self, length):
+ """
+ Confirm that the frame is an appropriate length.
+ """
+ if length > self.max_frame_size:
+ raise FrameTooLargeError(
+ "Received overlong frame: length %d, max %d" %
+ (length, self.max_frame_size)
+ )
+
+ def _update_header_buffer(self, f):
+ """
+ Updates the internal header buffer. Returns a frame that should replace
+ the current one. May throw exceptions if this frame is invalid.
+ """
+ # Check if we're in the middle of a headers block. If we are, this
+ # frame *must* be a CONTINUATION frame with the same stream ID as the
+ # leading HEADERS or PUSH_PROMISE frame. Anything else is a
+ # ProtocolError. If the frame *is* valid, append it to the header
+ # buffer.
+ if self._headers_buffer:
+ stream_id = self._headers_buffer[0].stream_id
+ valid_frame = (
+ f is not None and
+ isinstance(f, ContinuationFrame) and
+ f.stream_id == stream_id
+ )
+ if not valid_frame:
+ raise ProtocolError("Invalid frame during header block.")
+
+ # Append the frame to the buffer.
+ self._headers_buffer.append(f)
+ if len(self._headers_buffer) > CONTINUATION_BACKLOG:
+ raise ProtocolError("Too many continuation frames received.")
+
+ # If this is the end of the header block, then we want to build a
+ # mutant HEADERS frame that's massive. Use the original one we got,
+ # then set END_HEADERS and set its data appopriately. If it's not
+ # the end of the block, lose the current frame: we can't yield it.
+ if 'END_HEADERS' in f.flags:
+ f = self._headers_buffer[0]
+ f.flags.add('END_HEADERS')
+ f.data = b''.join(x.data for x in self._headers_buffer)
+ self._headers_buffer = []
+ else:
+ f = None
+ elif (isinstance(f, (HeadersFrame, PushPromiseFrame)) and
+ 'END_HEADERS' not in f.flags):
+ # This is the start of a headers block! Save the frame off and then
+ # act like we didn't receive one.
+ self._headers_buffer.append(f)
+ f = None
+
+ return f
+
+ # The methods below support the iterator protocol.
+ def __iter__(self):
+ return self
+
+ def next(self): # Python 2
+ # First, check that we have enough data to successfully parse the
+ # next frame header. If not, bail. Otherwise, parse it.
+ if len(self.data) < 9:
+ raise StopIteration()
+
+ try:
+ f, length = self._parse_frame_header(self.data)
+ except InvalidFrameError: # pragma: no cover
+ raise ProtocolError("Received frame with invalid frame header.")
+
+ # Next, check that we have enough length to parse the frame body. If
+ # not, bail, leaving the frame header data in the buffer for next time.
+ if len(self.data) < length + 9:
+ raise StopIteration()
+
+ # Confirm the frame has an appropriate length.
+ self._validate_frame_length(length)
+
+ # Don't try to parse the body if we didn't get a frame we know about:
+ # there's nothing we can do with it anyway.
+ if f is not None:
+ try:
+ f.parse_body(memoryview(self.data[9:9+length]))
+ except InvalidFrameError:
+ raise FrameDataMissingError("Frame data missing or invalid")
+
+ # At this point, as we know we'll use or discard the entire frame, we
+ # can update the data.
+ self.data = self.data[9+length:]
+
+ # Pass the frame through the header buffer.
+ f = self._update_header_buffer(f)
+
+ # If we got a frame we didn't understand or shouldn't yield, rather
+ # than return None it'd be better if we just tried to get the next
+ # frame in the sequence instead. Recurse back into ourselves to do
+ # that. This is safe because the amount of work we have to do here is
+ # strictly bounded by the length of the buffer.
+ return f if f is not None else self.next()
+
+ def __next__(self): # Python 3
+ return self.next()
diff --git a/src/h2/settings.py b/src/h2/settings.py
new file mode 100644
index 0000000..bf87c94
--- /dev/null
+++ b/src/h2/settings.py
@@ -0,0 +1,339 @@
+# -*- coding: utf-8 -*-
+"""
+h2/settings
+~~~~~~~~~~~
+
+This module contains a HTTP/2 settings object. This object provides a simple
+API for manipulating HTTP/2 settings, keeping track of both the current active
+state of the settings and the unacknowledged future values of the settings.
+"""
+import collections
+import enum
+
+from hyperframe.frame import SettingsFrame
+
+from h2.errors import ErrorCodes
+from h2.exceptions import InvalidSettingsValueError
+
+try:
+ from collections.abc import MutableMapping
+except ImportError: # pragma: no cover
+ # Python 2.7 compatibility
+ from collections import MutableMapping
+
+
+class SettingCodes(enum.IntEnum):
+ """
+ All known HTTP/2 setting codes.
+
+ .. versionadded:: 2.6.0
+ """
+
+ #: Allows the sender to inform the remote endpoint of the maximum size of
+ #: the header compression table used to decode header blocks, in octets.
+ HEADER_TABLE_SIZE = SettingsFrame.HEADER_TABLE_SIZE
+
+ #: This setting can be used to disable server push. To disable server push
+ #: on a client, set this to 0.
+ ENABLE_PUSH = SettingsFrame.ENABLE_PUSH
+
+ #: Indicates the maximum number of concurrent streams that the sender will
+ #: allow.
+ MAX_CONCURRENT_STREAMS = SettingsFrame.MAX_CONCURRENT_STREAMS
+
+ #: Indicates the sender's initial window size (in octets) for stream-level
+ #: flow control.
+ INITIAL_WINDOW_SIZE = SettingsFrame.INITIAL_WINDOW_SIZE
+
+ #: Indicates the size of the largest frame payload that the sender is
+ #: willing to receive, in octets.
+ MAX_FRAME_SIZE = SettingsFrame.MAX_FRAME_SIZE
+
+ #: This advisory setting informs a peer of the maximum size of header list
+ #: that the sender is prepared to accept, in octets. The value is based on
+ #: the uncompressed size of header fields, including the length of the name
+ #: and value in octets plus an overhead of 32 octets for each header field.
+ MAX_HEADER_LIST_SIZE = SettingsFrame.MAX_HEADER_LIST_SIZE
+
+ #: This setting can be used to enable the connect protocol. To enable on a
+ #: client set this to 1.
+ ENABLE_CONNECT_PROTOCOL = SettingsFrame.ENABLE_CONNECT_PROTOCOL
+
+
+def _setting_code_from_int(code):
+ """
+ Given an integer setting code, returns either one of :class:`SettingCodes
+ <h2.settings.SettingCodes>` or, if not present in the known set of codes,
+ returns the integer directly.
+ """
+ try:
+ return SettingCodes(code)
+ except ValueError:
+ return code
+
+
+class ChangedSetting:
+
+ def __init__(self, setting, original_value, new_value):
+ #: The setting code given. Either one of :class:`SettingCodes
+ #: <h2.settings.SettingCodes>` or ``int``
+ #:
+ #: .. versionchanged:: 2.6.0
+ self.setting = setting
+
+ #: The original value before being changed.
+ self.original_value = original_value
+
+ #: The new value after being changed.
+ self.new_value = new_value
+
+ def __repr__(self):
+ return (
+ "ChangedSetting(setting=%s, original_value=%s, "
+ "new_value=%s)"
+ ) % (
+ self.setting,
+ self.original_value,
+ self.new_value
+ )
+
+
+class Settings(MutableMapping):
+ """
+ An object that encapsulates HTTP/2 settings state.
+
+ HTTP/2 Settings are a complex beast. Each party, remote and local, has its
+ own settings and a view of the other party's settings. When a settings
+ frame is emitted by a peer it cannot assume that the new settings values
+ are in place until the remote peer acknowledges the setting. In principle,
+ multiple settings changes can be "in flight" at the same time, all with
+ different values.
+
+ This object encapsulates this mess. It provides a dict-like interface to
+ settings, which return the *current* values of the settings in question.
+ Additionally, it keeps track of the stack of proposed values: each time an
+ acknowledgement is sent/received, it updates the current values with the
+ stack of proposed values. On top of all that, it validates the values to
+ make sure they're allowed, and raises :class:`InvalidSettingsValueError
+ <h2.exceptions.InvalidSettingsValueError>` if they are not.
+
+ Finally, this object understands what the default values of the HTTP/2
+ settings are, and sets those defaults appropriately.
+
+ .. versionchanged:: 2.2.0
+ Added the ``initial_values`` parameter.
+
+ .. versionchanged:: 2.5.0
+ Added the ``max_header_list_size`` property.
+
+ :param client: (optional) Whether these settings should be defaulted for a
+ client implementation or a server implementation. Defaults to ``True``.
+ :type client: ``bool``
+ :param initial_values: (optional) Any initial values the user would like
+ set, rather than RFC 7540's defaults.
+ :type initial_vales: ``MutableMapping``
+ """
+ def __init__(self, client=True, initial_values=None):
+ # Backing object for the settings. This is a dictionary of
+ # (setting: [list of values]), where the first value in the list is the
+ # current value of the setting. Strictly this doesn't use lists but
+ # instead uses collections.deque to avoid repeated memory allocations.
+ #
+ # This contains the default values for HTTP/2.
+ self._settings = {
+ SettingCodes.HEADER_TABLE_SIZE: collections.deque([4096]),
+ SettingCodes.ENABLE_PUSH: collections.deque([int(client)]),
+ SettingCodes.INITIAL_WINDOW_SIZE: collections.deque([65535]),
+ SettingCodes.MAX_FRAME_SIZE: collections.deque([16384]),
+ SettingCodes.ENABLE_CONNECT_PROTOCOL: collections.deque([0]),
+ }
+ if initial_values is not None:
+ for key, value in initial_values.items():
+ invalid = _validate_setting(key, value)
+ if invalid:
+ raise InvalidSettingsValueError(
+ "Setting %d has invalid value %d" % (key, value),
+ error_code=invalid
+ )
+ self._settings[key] = collections.deque([value])
+
+ def acknowledge(self):
+ """
+ The settings have been acknowledged, either by the user (remote
+ settings) or by the remote peer (local settings).
+
+ :returns: A dict of {setting: ChangedSetting} that were applied.
+ """
+ changed_settings = {}
+
+ # If there is more than one setting in the list, we have a setting
+ # value outstanding. Update them.
+ for k, v in self._settings.items():
+ if len(v) > 1:
+ old_setting = v.popleft()
+ new_setting = v[0]
+ changed_settings[k] = ChangedSetting(
+ k, old_setting, new_setting
+ )
+
+ return changed_settings
+
+ # Provide easy-access to well known settings.
+ @property
+ def header_table_size(self):
+ """
+ The current value of the :data:`HEADER_TABLE_SIZE
+ <h2.settings.SettingCodes.HEADER_TABLE_SIZE>` setting.
+ """
+ return self[SettingCodes.HEADER_TABLE_SIZE]
+
+ @header_table_size.setter
+ def header_table_size(self, value):
+ self[SettingCodes.HEADER_TABLE_SIZE] = value
+
+ @property
+ def enable_push(self):
+ """
+ The current value of the :data:`ENABLE_PUSH
+ <h2.settings.SettingCodes.ENABLE_PUSH>` setting.
+ """
+ return self[SettingCodes.ENABLE_PUSH]
+
+ @enable_push.setter
+ def enable_push(self, value):
+ self[SettingCodes.ENABLE_PUSH] = value
+
+ @property
+ def initial_window_size(self):
+ """
+ The current value of the :data:`INITIAL_WINDOW_SIZE
+ <h2.settings.SettingCodes.INITIAL_WINDOW_SIZE>` setting.
+ """
+ return self[SettingCodes.INITIAL_WINDOW_SIZE]
+
+ @initial_window_size.setter
+ def initial_window_size(self, value):
+ self[SettingCodes.INITIAL_WINDOW_SIZE] = value
+
+ @property
+ def max_frame_size(self):
+ """
+ The current value of the :data:`MAX_FRAME_SIZE
+ <h2.settings.SettingCodes.MAX_FRAME_SIZE>` setting.
+ """
+ return self[SettingCodes.MAX_FRAME_SIZE]
+
+ @max_frame_size.setter
+ def max_frame_size(self, value):
+ self[SettingCodes.MAX_FRAME_SIZE] = value
+
+ @property
+ def max_concurrent_streams(self):
+ """
+ The current value of the :data:`MAX_CONCURRENT_STREAMS
+ <h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS>` setting.
+ """
+ return self.get(SettingCodes.MAX_CONCURRENT_STREAMS, 2**32+1)
+
+ @max_concurrent_streams.setter
+ def max_concurrent_streams(self, value):
+ self[SettingCodes.MAX_CONCURRENT_STREAMS] = value
+
+ @property
+ def max_header_list_size(self):
+ """
+ The current value of the :data:`MAX_HEADER_LIST_SIZE
+ <h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE>` setting. If not set,
+ returns ``None``, which means unlimited.
+
+ .. versionadded:: 2.5.0
+ """
+ return self.get(SettingCodes.MAX_HEADER_LIST_SIZE, None)
+
+ @max_header_list_size.setter
+ def max_header_list_size(self, value):
+ self[SettingCodes.MAX_HEADER_LIST_SIZE] = value
+
+ @property
+ def enable_connect_protocol(self):
+ """
+ The current value of the :data:`ENABLE_CONNECT_PROTOCOL
+ <h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL>` setting.
+ """
+ return self[SettingCodes.ENABLE_CONNECT_PROTOCOL]
+
+ @enable_connect_protocol.setter
+ def enable_connect_protocol(self, value):
+ self[SettingCodes.ENABLE_CONNECT_PROTOCOL] = value
+
+ # Implement the MutableMapping API.
+ def __getitem__(self, key):
+ val = self._settings[key][0]
+
+ # Things that were created when a setting was received should stay
+ # KeyError'd.
+ if val is None:
+ raise KeyError
+
+ return val
+
+ def __setitem__(self, key, value):
+ invalid = _validate_setting(key, value)
+ if invalid:
+ raise InvalidSettingsValueError(
+ "Setting %d has invalid value %d" % (key, value),
+ error_code=invalid
+ )
+
+ try:
+ items = self._settings[key]
+ except KeyError:
+ items = collections.deque([None])
+ self._settings[key] = items
+
+ items.append(value)
+
+ def __delitem__(self, key):
+ del self._settings[key]
+
+ def __iter__(self):
+ return self._settings.__iter__()
+
+ def __len__(self):
+ return len(self._settings)
+
+ def __eq__(self, other):
+ if isinstance(other, Settings):
+ return self._settings == other._settings
+ else:
+ return NotImplemented
+
+ def __ne__(self, other):
+ if isinstance(other, Settings):
+ return not self == other
+ else:
+ return NotImplemented
+
+
+def _validate_setting(setting, value): # noqa: C901
+ """
+ Confirms that a specific setting has a well-formed value. If the setting is
+ invalid, returns an error code. Otherwise, returns 0 (NO_ERROR).
+ """
+ if setting == SettingCodes.ENABLE_PUSH:
+ if value not in (0, 1):
+ return ErrorCodes.PROTOCOL_ERROR
+ elif setting == SettingCodes.INITIAL_WINDOW_SIZE:
+ if not 0 <= value <= 2147483647: # 2^31 - 1
+ return ErrorCodes.FLOW_CONTROL_ERROR
+ elif setting == SettingCodes.MAX_FRAME_SIZE:
+ if not 16384 <= value <= 16777215: # 2^14 and 2^24 - 1
+ return ErrorCodes.PROTOCOL_ERROR
+ elif setting == SettingCodes.MAX_HEADER_LIST_SIZE:
+ if value < 0:
+ return ErrorCodes.PROTOCOL_ERROR
+ elif setting == SettingCodes.ENABLE_CONNECT_PROTOCOL:
+ if value not in (0, 1):
+ return ErrorCodes.PROTOCOL_ERROR
+
+ return 0
diff --git a/src/h2/stream.py b/src/h2/stream.py
new file mode 100644
index 0000000..af3d1cc
--- /dev/null
+++ b/src/h2/stream.py
@@ -0,0 +1,1371 @@
+# -*- coding: utf-8 -*-
+"""
+h2/stream
+~~~~~~~~~
+
+An implementation of a HTTP/2 stream.
+"""
+from enum import Enum, IntEnum
+from hpack import HeaderTuple
+from hyperframe.frame import (
+ HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame,
+ RstStreamFrame, PushPromiseFrame, AltSvcFrame
+)
+
+from .errors import ErrorCodes, _error_code_from_int
+from .events import (
+ RequestReceived, ResponseReceived, DataReceived, WindowUpdated,
+ StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived,
+ InformationalResponseReceived, AlternativeServiceAvailable,
+ _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent
+)
+from .exceptions import (
+ ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError
+)
+from .utilities import (
+ guard_increment_window, is_informational_response, authority_from_headers,
+ validate_headers, validate_outbound_headers, normalize_outbound_headers,
+ HeaderValidationFlags, extract_method_header, normalize_inbound_headers
+)
+from .windows import WindowManager
+
+
+class StreamState(IntEnum):
+ IDLE = 0
+ RESERVED_REMOTE = 1
+ RESERVED_LOCAL = 2
+ OPEN = 3
+ HALF_CLOSED_REMOTE = 4
+ HALF_CLOSED_LOCAL = 5
+ CLOSED = 6
+
+
+class StreamInputs(Enum):
+ SEND_HEADERS = 0
+ SEND_PUSH_PROMISE = 1
+ SEND_RST_STREAM = 2
+ SEND_DATA = 3
+ SEND_WINDOW_UPDATE = 4
+ SEND_END_STREAM = 5
+ RECV_HEADERS = 6
+ RECV_PUSH_PROMISE = 7
+ RECV_RST_STREAM = 8
+ RECV_DATA = 9
+ RECV_WINDOW_UPDATE = 10
+ RECV_END_STREAM = 11
+ RECV_CONTINUATION = 12 # Added in 2.0.0
+ SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0
+ RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0
+ SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0
+ RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0
+ UPGRADE_CLIENT = 17 # Added 2.3.0
+ UPGRADE_SERVER = 18 # Added 2.3.0
+
+
+class StreamClosedBy(Enum):
+ SEND_END_STREAM = 0
+ RECV_END_STREAM = 1
+ SEND_RST_STREAM = 2
+ RECV_RST_STREAM = 3
+
+
+# This array is initialized once, and is indexed by the stream states above.
+# It indicates whether a stream in the given state is open. The reason we do
+# this is that we potentially check whether a stream in a given state is open
+# quite frequently: given that we check so often, we should do so in the
+# fastest and most performant way possible.
+STREAM_OPEN = [False for _ in range(0, len(StreamState))]
+STREAM_OPEN[StreamState.OPEN] = True
+STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
+STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
+
+
+class H2StreamStateMachine(object):
+ """
+ A single HTTP/2 stream state machine.
+
+ This stream object implements basically the state machine described in
+ RFC 7540 section 5.1.
+
+ :param stream_id: The stream ID of this stream. This is stored primarily
+ for logging purposes.
+ """
+ def __init__(self, stream_id):
+ self.state = StreamState.IDLE
+ self.stream_id = stream_id
+
+ #: Whether this peer is the client side of this stream.
+ self.client = None
+
+ # Whether trailers have been sent/received on this stream or not.
+ self.headers_sent = None
+ self.trailers_sent = None
+ self.headers_received = None
+ self.trailers_received = None
+
+ # How the stream was closed. One of StreamClosedBy.
+ self.stream_closed_by = None
+
+ def process_input(self, input_):
+ """
+ Process a specific input in the state machine.
+ """
+ if not isinstance(input_, StreamInputs):
+ raise ValueError("Input must be an instance of StreamInputs")
+
+ try:
+ func, target_state = _transitions[(self.state, input_)]
+ except KeyError:
+ old_state = self.state
+ self.state = StreamState.CLOSED
+ raise ProtocolError(
+ "Invalid input %s in state %s" % (input_, old_state)
+ )
+ else:
+ previous_state = self.state
+ self.state = target_state
+ if func is not None:
+ try:
+ return func(self, previous_state)
+ except ProtocolError:
+ self.state = StreamState.CLOSED
+ raise
+ except AssertionError as e: # pragma: no cover
+ self.state = StreamState.CLOSED
+ raise ProtocolError(e)
+
+ return []
+
+ def request_sent(self, previous_state):
+ """
+ Fires when a request is sent.
+ """
+ self.client = True
+ self.headers_sent = True
+ event = _RequestSent()
+
+ return [event]
+
+ def response_sent(self, previous_state):
+ """
+ Fires when something that should be a response is sent. This 'response'
+ may actually be trailers.
+ """
+ if not self.headers_sent:
+ if self.client is True or self.client is None:
+ raise ProtocolError("Client cannot send responses.")
+ self.headers_sent = True
+ event = _ResponseSent()
+ else:
+ assert not self.trailers_sent
+ self.trailers_sent = True
+ event = _TrailersSent()
+
+ return [event]
+
+ def request_received(self, previous_state):
+ """
+ Fires when a request is received.
+ """
+ assert not self.headers_received
+ assert not self.trailers_received
+
+ self.client = False
+ self.headers_received = True
+ event = RequestReceived()
+
+ event.stream_id = self.stream_id
+ return [event]
+
+ def response_received(self, previous_state):
+ """
+ Fires when a response is received. Also disambiguates between responses
+ and trailers.
+ """
+ if not self.headers_received:
+ assert self.client is True
+ self.headers_received = True
+ event = ResponseReceived()
+ else:
+ assert not self.trailers_received
+ self.trailers_received = True
+ event = TrailersReceived()
+
+ event.stream_id = self.stream_id
+ return [event]
+
+ def data_received(self, previous_state):
+ """
+ Fires when data is received.
+ """
+ if not self.headers_received:
+ raise ProtocolError("cannot receive data before headers")
+ event = DataReceived()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def window_updated(self, previous_state):
+ """
+ Fires when a window update frame is received.
+ """
+ event = WindowUpdated()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def stream_half_closed(self, previous_state):
+ """
+ Fires when an END_STREAM flag is received in the OPEN state,
+ transitioning this stream to a HALF_CLOSED_REMOTE state.
+ """
+ event = StreamEnded()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def stream_ended(self, previous_state):
+ """
+ Fires when a stream is cleanly ended.
+ """
+ self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
+ event = StreamEnded()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def stream_reset(self, previous_state):
+ """
+ Fired when a stream is forcefully reset.
+ """
+ self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
+ event = StreamReset()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def send_new_pushed_stream(self, previous_state):
+ """
+ Fires on the newly pushed stream, when pushed by the local peer.
+
+ No event here, but definitionally this peer must be a server.
+ """
+ assert self.client is None
+ self.client = False
+ self.headers_received = True
+ return []
+
+ def recv_new_pushed_stream(self, previous_state):
+ """
+ Fires on the newly pushed stream, when pushed by the remote peer.
+
+ No event here, but definitionally this peer must be a client.
+ """
+ assert self.client is None
+ self.client = True
+ self.headers_sent = True
+ return []
+
+ def send_push_promise(self, previous_state):
+ """
+ Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
+ We may only send PUSH_PROMISE frames if we're a server.
+ """
+ if self.client is True:
+ raise ProtocolError("Cannot push streams from client peers.")
+
+ event = _PushedRequestSent()
+ return [event]
+
+ def recv_push_promise(self, previous_state):
+ """
+ Fires on the already-existing stream when a PUSH_PROMISE frame is
+ received. We may only receive PUSH_PROMISE frames if we're a client.
+
+ Fires a PushedStreamReceived event.
+ """
+ if not self.client:
+ if self.client is None: # pragma: no cover
+ msg = "Idle streams cannot receive pushes"
+ else: # pragma: no cover
+ msg = "Cannot receive pushed streams as a server"
+ raise ProtocolError(msg)
+
+ event = PushedStreamReceived()
+ event.parent_stream_id = self.stream_id
+ return [event]
+
+ def send_end_stream(self, previous_state):
+ """
+ Called when an attempt is made to send END_STREAM in the
+ HALF_CLOSED_REMOTE state.
+ """
+ self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
+
+ def send_reset_stream(self, previous_state):
+ """
+ Called when an attempt is made to send RST_STREAM in a non-closed
+ stream state.
+ """
+ self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
+
+ def reset_stream_on_error(self, previous_state):
+ """
+ Called when we need to forcefully emit another RST_STREAM frame on
+ behalf of the state machine.
+
+ If this is the first time we've done this, we should also hang an event
+ off the StreamClosedError so that the user can be informed. We know
+ it's the first time we've done this if the stream is currently in a
+ state other than CLOSED.
+ """
+ self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
+
+ error = StreamClosedError(self.stream_id)
+
+ event = StreamReset()
+ event.stream_id = self.stream_id
+ event.error_code = ErrorCodes.STREAM_CLOSED
+ event.remote_reset = False
+ error._events = [event]
+ raise error
+
+ def recv_on_closed_stream(self, previous_state):
+ """
+ Called when an unexpected frame is received on an already-closed
+ stream.
+
+ An endpoint that receives an unexpected frame should treat it as
+ a stream error or connection error with type STREAM_CLOSED, depending
+ on the specific frame. The error handling is done at a higher level:
+ this just raises the appropriate error.
+ """
+ raise StreamClosedError(self.stream_id)
+
+ def send_on_closed_stream(self, previous_state):
+ """
+ Called when an attempt is made to send data on an already-closed
+ stream.
+
+ This essentially overrides the standard logic by throwing a
+ more-specific error: StreamClosedError. This is a ProtocolError, so it
+ matches the standard API of the state machine, but provides more detail
+ to the user.
+ """
+ raise StreamClosedError(self.stream_id)
+
+ def recv_push_on_closed_stream(self, previous_state):
+ """
+ Called when a PUSH_PROMISE frame is received on a full stop
+ stream.
+
+ If the stream was closed by us sending a RST_STREAM frame, then we
+ presume that the PUSH_PROMISE was in flight when we reset the parent
+ stream. Rathen than accept the new stream, we just reset it.
+ Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
+ naturally closed stream is a real problem because it creates a brand
+ new stream that the remote peer now believes exists.
+ """
+ assert self.stream_closed_by is not None
+
+ if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
+ raise StreamClosedError(self.stream_id)
+ else:
+ raise ProtocolError("Attempted to push on closed stream.")
+
+ def send_push_on_closed_stream(self, previous_state):
+ """
+ Called when an attempt is made to push on an already-closed stream.
+
+ This essentially overrides the standard logic by providing a more
+ useful error message. It's necessary because simply indicating that the
+ stream is closed is not enough: there is now a new stream that is not
+ allowed to be there. The only recourse is to tear the whole connection
+ down.
+ """
+ raise ProtocolError("Attempted to push on closed stream.")
+
+ def send_informational_response(self, previous_state):
+ """
+ Called when an informational header block is sent (that is, a block
+ where the :status header has a 1XX value).
+
+ Only enforces that these are sent *before* final headers are sent.
+ """
+ if self.headers_sent:
+ raise ProtocolError("Information response after final response")
+
+ event = _ResponseSent()
+ return [event]
+
+ def recv_informational_response(self, previous_state):
+ """
+ Called when an informational header block is received (that is, a block
+ where the :status header has a 1XX value).
+ """
+ if self.headers_received:
+ raise ProtocolError("Informational response after final response")
+
+ event = InformationalResponseReceived()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def recv_alt_svc(self, previous_state):
+ """
+ Called when receiving an ALTSVC frame.
+
+ RFC 7838 allows us to receive ALTSVC frames at any stream state, which
+ is really absurdly overzealous. For that reason, we want to limit the
+ states in which we can actually receive it. It's really only sensible
+ to receive it after we've sent our own headers and before the server
+ has sent its header block: the server can't guarantee that we have any
+ state around after it completes its header block, and the server
+ doesn't know what origin we're talking about before we've sent ours.
+
+ For that reason, this function applies a few extra checks on both state
+ and some of the little state variables we keep around. If those suggest
+ an unreasonable situation for the ALTSVC frame to have been sent in,
+ we quietly ignore it (as RFC 7838 suggests).
+
+ This function is also *not* always called by the state machine. In some
+ states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
+ because we know the frame cannot be valid in that state (IDLE because
+ the server cannot know what origin the stream applies to, CLOSED
+ because the server cannot assume we still have state around,
+ RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
+ state then *we* are the server).
+ """
+ # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
+ # them.
+ if self.client is False:
+ return []
+
+ # If we've received the response headers from the server they can't
+ # guarantee we still have any state around. Other implementations
+ # (like nghttp2) ignore ALTSVC in this state, so we will too.
+ if self.headers_received:
+ return []
+
+ # Otherwise, this is a sensible enough frame to have received. Return
+ # the event and let it get populated.
+ return [AlternativeServiceAvailable()]
+
+ def send_alt_svc(self, previous_state):
+ """
+ Called when sending an ALTSVC frame on this stream.
+
+ For consistency with the restrictions we apply on receiving ALTSVC
+ frames in ``recv_alt_svc``, we want to restrict when users can send
+ ALTSVC frames to the situations when we ourselves would accept them.
+
+ That means: when we are a server, when we have received the request
+ headers, and when we have not yet sent our own response headers.
+ """
+ # We should not send ALTSVC after we've sent response headers, as the
+ # client may have disposed of its state.
+ if self.headers_sent:
+ raise ProtocolError(
+ "Cannot send ALTSVC after sending response headers."
+ )
+
+ return
+
+
+# STATE MACHINE
+#
+# The stream state machine is defined here to avoid the need to allocate it
+# repeatedly for each stream. It cannot be defined in the stream class because
+# it needs to be able to reference the callbacks defined on the class, but
+# because Python's scoping rules are weird the class object is not actually in
+# scope during the body of the class object.
+#
+# For the sake of clarity, we reproduce the RFC 7540 state machine here:
+#
+# +--------+
+# send PP | | recv PP
+# ,--------| idle |--------.
+# / | | \
+# v +--------+ v
+# +----------+ | +----------+
+# | | | send H / | |
+# ,------| reserved | | recv H | reserved |------.
+# | | (local) | | | (remote) | |
+# | +----------+ v +----------+ |
+# | | +--------+ | |
+# | | recv ES | | send ES | |
+# | send H | ,-------| open |-------. | recv H |
+# | | / | | \ | |
+# | v v +--------+ v v |
+# | +----------+ | +----------+ |
+# | | half | | | half | |
+# | | closed | | send R / | closed | |
+# | | (remote) | | recv R | (local) | |
+# | +----------+ | +----------+ |
+# | | | | |
+# | | send ES / | recv ES / | |
+# | | send R / v send R / | |
+# | | recv R +--------+ recv R | |
+# | send R / `----------->| |<-----------' send R / |
+# | recv R | closed | recv R |
+# `----------------------->| |<----------------------'
+# +--------+
+#
+# send: endpoint sends this frame
+# recv: endpoint receives this frame
+#
+# H: HEADERS frame (with implied CONTINUATIONs)
+# PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
+# ES: END_STREAM flag
+# R: RST_STREAM frame
+#
+# For the purposes of this state machine we treat HEADERS and their
+# associated CONTINUATION frames as a single jumbo frame. The protocol
+# allows/requires this by preventing other frames from being interleved in
+# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
+# received without a prior HEADERS frame, it *will* be passed to this state
+# machine. The state machine should always reject that frame, either as an
+# invalid transition or because the stream is closed.
+#
+# There is a confusing relationship around PUSH_PROMISE frames. The state
+# machine above considers them to be frames belonging to the new stream,
+# which is *somewhat* true. However, they are sent with the stream ID of
+# their related stream, and are only sendable in some cases.
+# For this reason, our state machine implementation below allows for
+# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
+# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
+# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on
+# two streams.
+#
+# The _transitions dictionary contains a mapping of tuples of
+# (state, input) to tuples of (side_effect_function, end_state). This
+# map contains all allowed transitions: anything not in this map is
+# invalid and immediately causes a transition to ``closed``.
+_transitions = {
+ # State: idle
+ (StreamState.IDLE, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.request_sent, StreamState.OPEN),
+ (StreamState.IDLE, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.request_received, StreamState.OPEN),
+ (StreamState.IDLE, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
+ (H2StreamStateMachine.send_new_pushed_stream,
+ StreamState.RESERVED_LOCAL),
+ (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.recv_new_pushed_stream,
+ StreamState.RESERVED_REMOTE),
+ (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, StreamState.IDLE),
+ (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
+ (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
+ (H2StreamStateMachine.request_received,
+ StreamState.HALF_CLOSED_REMOTE),
+
+ # State: reserved local
+ (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.RESERVED_LOCAL),
+ (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
+ (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
+ (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, StreamState.RESERVED_LOCAL),
+
+ # State: reserved remote
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.response_received,
+ StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.RESERVED_REMOTE),
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
+ (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
+
+ # State: open
+ (StreamState.OPEN, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.response_sent, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.response_received, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_DATA):
+ (None, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.data_received, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
+ (None, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
+ (H2StreamStateMachine.stream_half_closed,
+ StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
+ (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
+ (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
+ (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
+
+ # State: half-closed remote
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
+ (None, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
+ (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
+ (H2StreamStateMachine.send_push_promise,
+ StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
+ (H2StreamStateMachine.send_informational_response,
+ StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
+
+ # State: half-closed local
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.response_received,
+ StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
+ (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.recv_push_promise,
+ StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
+ (H2StreamStateMachine.recv_informational_response,
+ StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
+
+ # State: closed
+ (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
+ (None, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, StreamState.CLOSED),
+
+ # RFC 7540 Section 5.1 defines how the end point should react when
+ # receiving a frame on a closed stream with the following statements:
+ #
+ # > An endpoint that receives any frame other than PRIORITY after receiving
+ # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
+ # > An endpoint that receives any frames after receiving a frame with the
+ # > END_STREAM flag set MUST treat that as a connection error of type
+ # > STREAM_CLOSED.
+ (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
+
+ # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
+ # > for a short period after a DATA or HEADERS frame containing a
+ # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we
+ # > don't have access to a clock so we just always allow it.
+ (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
+ (None, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
+ (None, StreamState.CLOSED),
+
+ # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
+ # > neither "open" nor "half-closed (local)" as a connection error of type
+ # > PROTOCOL_ERROR.
+ (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
+
+ # Also, users should be forbidden from sending on closed streams.
+ (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
+ (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_DATA):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+}
+
+
+class H2Stream(object):
+ """
+ A low-level HTTP/2 stream object. This handles building and receiving
+ frames and maintains per-stream state.
+
+ This wraps a HTTP/2 Stream state machine implementation, ensuring that
+ frames can only be sent/received when the stream is in a valid state.
+ Attempts to create frames that cannot be sent will raise a
+ ``ProtocolError``.
+ """
+ def __init__(self,
+ stream_id,
+ config,
+ inbound_window_size,
+ outbound_window_size):
+ self.state_machine = H2StreamStateMachine(stream_id)
+ self.stream_id = stream_id
+ self.max_outbound_frame_size = None
+ self.request_method = None
+
+ # The current value of the outbound stream flow control window
+ self.outbound_flow_control_window = outbound_window_size
+
+ # The flow control manager.
+ self._inbound_window_manager = WindowManager(inbound_window_size)
+
+ # The expected content length, if any.
+ self._expected_content_length = None
+
+ # The actual received content length. Always tracked.
+ self._actual_content_length = 0
+
+ # The authority we believe this stream belongs to.
+ self._authority = None
+
+ # The configuration for this stream.
+ self.config = config
+
+ def __repr__(self):
+ return "<%s id:%d state:%r>" % (
+ type(self).__name__,
+ self.stream_id,
+ self.state_machine.state
+ )
+
+ @property
+ def inbound_flow_control_window(self):
+ """
+ The size of the inbound flow control window for the stream. This is
+ rarely publicly useful: instead, use :meth:`remote_flow_control_window
+ <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
+ largely present to provide a shortcut to this data.
+ """
+ return self._inbound_window_manager.current_window_size
+
+ @property
+ def open(self):
+ """
+ Whether the stream is 'open' in any sense: that is, whether it counts
+ against the number of concurrent streams.
+ """
+ # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
+ # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
+ # this excludes the reserved states.
+ # For more detail on why we're doing this in this slightly weird way,
+ # see the comment on ``STREAM_OPEN`` at the top of the file.
+ return STREAM_OPEN[self.state_machine.state]
+
+ @property
+ def closed(self):
+ """
+ Whether the stream is closed.
+ """
+ return self.state_machine.state == StreamState.CLOSED
+
+ @property
+ def closed_by(self):
+ """
+ Returns how the stream was closed, as one of StreamClosedBy.
+ """
+ return self.state_machine.stream_closed_by
+
+ def upgrade(self, client_side):
+ """
+ Called by the connection to indicate that this stream is the initial
+ request/response of an upgraded connection. Places the stream into an
+ appropriate state.
+ """
+ self.config.logger.debug("Upgrading %r", self)
+
+ assert self.stream_id == 1
+ input_ = (
+ StreamInputs.UPGRADE_CLIENT if client_side
+ else StreamInputs.UPGRADE_SERVER
+ )
+
+ # This may return events, we deliberately don't want them.
+ self.state_machine.process_input(input_)
+ return
+
+ def send_headers(self, headers, encoder, end_stream=False):
+ """
+ Returns a list of HEADERS/CONTINUATION frames to emit as either headers
+ or trailers.
+ """
+ self.config.logger.debug("Send headers %s on %r", headers, self)
+
+ # Because encoding headers makes an irreversible change to the header
+ # compression context, we make the state transition before we encode
+ # them.
+
+ # First, check if we're a client. If we are, no problem: if we aren't,
+ # we need to scan the header block to see if this is an informational
+ # response.
+ input_ = StreamInputs.SEND_HEADERS
+ if ((not self.state_machine.client) and
+ is_informational_response(headers)):
+ if end_stream:
+ raise ProtocolError(
+ "Cannot set END_STREAM on informational responses."
+ )
+
+ input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
+
+ events = self.state_machine.process_input(input_)
+
+ hf = HeadersFrame(self.stream_id)
+ hdr_validation_flags = self._build_hdr_validation_flags(events)
+ frames = self._build_headers_frames(
+ headers, encoder, hf, hdr_validation_flags
+ )
+
+ if end_stream:
+ # Not a bug: the END_STREAM flag is valid on the initial HEADERS
+ # frame, not the CONTINUATION frames that follow.
+ self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
+ frames[0].flags.add('END_STREAM')
+
+ if self.state_machine.trailers_sent and not end_stream:
+ raise ProtocolError("Trailers must have END_STREAM set.")
+
+ if self.state_machine.client and self._authority is None:
+ self._authority = authority_from_headers(headers)
+
+ # store request method for _initialize_content_length
+ self.request_method = extract_method_header(headers)
+
+ return frames
+
+ def push_stream_in_band(self, related_stream_id, headers, encoder):
+ """
+ Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
+ stream header. Called on the stream that has the PUSH_PROMISE frame
+ sent on it.
+ """
+ self.config.logger.debug("Push stream %r", self)
+
+ # Because encoding headers makes an irreversible change to the header
+ # compression context, we make the state transition *first*.
+
+ events = self.state_machine.process_input(
+ StreamInputs.SEND_PUSH_PROMISE
+ )
+
+ ppf = PushPromiseFrame(self.stream_id)
+ ppf.promised_stream_id = related_stream_id
+ hdr_validation_flags = self._build_hdr_validation_flags(events)
+ frames = self._build_headers_frames(
+ headers, encoder, ppf, hdr_validation_flags
+ )
+
+ return frames
+
+ def locally_pushed(self):
+ """
+ Mark this stream as one that was pushed by this peer. Must be called
+ immediately after initialization. Sends no frames, simply updates the
+ state machine.
+ """
+ # This does not trigger any events.
+ events = self.state_machine.process_input(
+ StreamInputs.SEND_PUSH_PROMISE
+ )
+ assert not events
+ return []
+
+ def send_data(self, data, end_stream=False, pad_length=None):
+ """
+ Prepare some data frames. Optionally end the stream.
+
+ .. warning:: Does not perform flow control checks.
+ """
+ self.config.logger.debug(
+ "Send data on %r with end stream set to %s", self, end_stream
+ )
+
+ self.state_machine.process_input(StreamInputs.SEND_DATA)
+
+ df = DataFrame(self.stream_id)
+ df.data = data
+ if end_stream:
+ self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
+ df.flags.add('END_STREAM')
+ if pad_length is not None:
+ df.flags.add('PADDED')
+ df.pad_length = pad_length
+
+ # Subtract flow_controlled_length to account for possible padding
+ self.outbound_flow_control_window -= df.flow_controlled_length
+ assert self.outbound_flow_control_window >= 0
+
+ return [df]
+
+ def end_stream(self):
+ """
+ End a stream without sending data.
+ """
+ self.config.logger.debug("End stream %r", self)
+
+ self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
+ df = DataFrame(self.stream_id)
+ df.flags.add('END_STREAM')
+ return [df]
+
+ def advertise_alternative_service(self, field_value):
+ """
+ Advertise an RFC 7838 alternative service. The semantics of this are
+ better documented in the ``H2Connection`` class.
+ """
+ self.config.logger.debug(
+ "Advertise alternative service of %r for %r", field_value, self
+ )
+ self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
+ asf = AltSvcFrame(self.stream_id)
+ asf.field = field_value
+ return [asf]
+
+ def increase_flow_control_window(self, increment):
+ """
+ Increase the size of the flow control window for the remote side.
+ """
+ self.config.logger.debug(
+ "Increase flow control window for %r by %d",
+ self, increment
+ )
+ self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
+ self._inbound_window_manager.window_opened(increment)
+
+ wuf = WindowUpdateFrame(self.stream_id)
+ wuf.window_increment = increment
+ return [wuf]
+
+ def receive_push_promise_in_band(self,
+ promised_stream_id,
+ headers,
+ header_encoding):
+ """
+ Receives a push promise frame sent on this stream, pushing a remote
+ stream. This is called on the stream that has the PUSH_PROMISE sent
+ on it.
+ """
+ self.config.logger.debug(
+ "Receive Push Promise on %r for remote stream %d",
+ self, promised_stream_id
+ )
+ events = self.state_machine.process_input(
+ StreamInputs.RECV_PUSH_PROMISE
+ )
+ events[0].pushed_stream_id = promised_stream_id
+
+ hdr_validation_flags = self._build_hdr_validation_flags(events)
+ events[0].headers = self._process_received_headers(
+ headers, hdr_validation_flags, header_encoding
+ )
+ return [], events
+
+ def remotely_pushed(self, pushed_headers):
+ """
+ Mark this stream as one that was pushed by the remote peer. Must be
+ called immediately after initialization. Sends no frames, simply
+ updates the state machine.
+ """
+ self.config.logger.debug("%r pushed by remote peer", self)
+ events = self.state_machine.process_input(
+ StreamInputs.RECV_PUSH_PROMISE
+ )
+ self._authority = authority_from_headers(pushed_headers)
+ return [], events
+
+ def receive_headers(self, headers, end_stream, header_encoding):
+ """
+ Receive a set of headers (or trailers).
+ """
+ if is_informational_response(headers):
+ if end_stream:
+ raise ProtocolError(
+ "Cannot set END_STREAM on informational responses"
+ )
+ input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
+ else:
+ input_ = StreamInputs.RECV_HEADERS
+
+ events = self.state_machine.process_input(input_)
+
+ if end_stream:
+ es_events = self.state_machine.process_input(
+ StreamInputs.RECV_END_STREAM
+ )
+ events[0].stream_ended = es_events[0]
+ events += es_events
+
+ self._initialize_content_length(headers)
+
+ if isinstance(events[0], TrailersReceived):
+ if not end_stream:
+ raise ProtocolError("Trailers must have END_STREAM set")
+
+ hdr_validation_flags = self._build_hdr_validation_flags(events)
+ events[0].headers = self._process_received_headers(
+ headers, hdr_validation_flags, header_encoding
+ )
+ return [], events
+
+ def receive_data(self, data, end_stream, flow_control_len):
+ """
+ Receive some data.
+ """
+ self.config.logger.debug(
+ "Receive data on %r with end stream %s and flow control length "
+ "set to %d", self, end_stream, flow_control_len
+ )
+ events = self.state_machine.process_input(StreamInputs.RECV_DATA)
+ self._inbound_window_manager.window_consumed(flow_control_len)
+ self._track_content_length(len(data), end_stream)
+
+ if end_stream:
+ es_events = self.state_machine.process_input(
+ StreamInputs.RECV_END_STREAM
+ )
+ events[0].stream_ended = es_events[0]
+ events.extend(es_events)
+
+ events[0].data = data
+ events[0].flow_controlled_length = flow_control_len
+ return [], events
+
+ def receive_window_update(self, increment):
+ """
+ Handle a WINDOW_UPDATE increment.
+ """
+ self.config.logger.debug(
+ "Receive Window Update on %r for increment of %d",
+ self, increment
+ )
+ events = self.state_machine.process_input(
+ StreamInputs.RECV_WINDOW_UPDATE
+ )
+ frames = []
+
+ # If we encounter a problem with incrementing the flow control window,
+ # this should be treated as a *stream* error, not a *connection* error.
+ # That means we need to catch the error and forcibly close the stream.
+ if events:
+ events[0].delta = increment
+ try:
+ self.outbound_flow_control_window = guard_increment_window(
+ self.outbound_flow_control_window,
+ increment
+ )
+ except FlowControlError:
+ # Ok, this is bad. We're going to need to perform a local
+ # reset.
+ event = StreamReset()
+ event.stream_id = self.stream_id
+ event.error_code = ErrorCodes.FLOW_CONTROL_ERROR
+ event.remote_reset = False
+
+ events = [event]
+ frames = self.reset_stream(event.error_code)
+
+ return frames, events
+
+ def receive_continuation(self):
+ """
+ A naked CONTINUATION frame has been received. This is always an error,
+ but the type of error it is depends on the state of the stream and must
+ transition the state of the stream, so we need to handle it.
+ """
+ self.config.logger.debug("Receive Continuation frame on %r", self)
+ self.state_machine.process_input(
+ StreamInputs.RECV_CONTINUATION
+ )
+ assert False, "Should not be reachable"
+
+ def receive_alt_svc(self, frame):
+ """
+ An Alternative Service frame was received on the stream. This frame
+ inherits the origin associated with this stream.
+ """
+ self.config.logger.debug(
+ "Receive Alternative Service frame on stream %r", self
+ )
+
+ # If the origin is present, RFC 7838 says we have to ignore it.
+ if frame.origin:
+ return [], []
+
+ events = self.state_machine.process_input(
+ StreamInputs.RECV_ALTERNATIVE_SERVICE
+ )
+
+ # There are lots of situations where we want to ignore the ALTSVC
+ # frame. If we need to pay attention, we'll have an event and should
+ # fill it out.
+ if events:
+ assert isinstance(events[0], AlternativeServiceAvailable)
+ events[0].origin = self._authority
+ events[0].field_value = frame.field
+
+ return [], events
+
+ def reset_stream(self, error_code=0):
+ """
+ Close the stream locally. Reset the stream with an error code.
+ """
+ self.config.logger.debug(
+ "Local reset %r with error code: %d", self, error_code
+ )
+ self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
+
+ rsf = RstStreamFrame(self.stream_id)
+ rsf.error_code = error_code
+ return [rsf]
+
+ def stream_reset(self, frame):
+ """
+ Handle a stream being reset remotely.
+ """
+ self.config.logger.debug(
+ "Remote reset %r with error code: %d", self, frame.error_code
+ )
+ events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
+
+ if events:
+ # We don't fire an event if this stream is already closed.
+ events[0].error_code = _error_code_from_int(frame.error_code)
+
+ return [], events
+
+ def acknowledge_received_data(self, acknowledged_size):
+ """
+ The user has informed us that they've processed some amount of data
+ that was received on this stream. Pass that to the window manager and
+ potentially return some WindowUpdate frames.
+ """
+ self.config.logger.debug(
+ "Acknowledge received data with size %d on %r",
+ acknowledged_size, self
+ )
+ increment = self._inbound_window_manager.process_bytes(
+ acknowledged_size
+ )
+ if increment:
+ f = WindowUpdateFrame(self.stream_id)
+ f.window_increment = increment
+ return [f]
+
+ return []
+
+ def _build_hdr_validation_flags(self, events):
+ """
+ Constructs a set of header validation flags for use when normalizing
+ and validating header blocks.
+ """
+ is_trailer = isinstance(
+ events[0], (_TrailersSent, TrailersReceived)
+ )
+ is_response_header = isinstance(
+ events[0],
+ (
+ _ResponseSent,
+ ResponseReceived,
+ InformationalResponseReceived
+ )
+ )
+ is_push_promise = isinstance(
+ events[0], (PushedStreamReceived, _PushedRequestSent)
+ )
+
+ return HeaderValidationFlags(
+ is_client=self.state_machine.client,
+ is_trailer=is_trailer,
+ is_response_header=is_response_header,
+ is_push_promise=is_push_promise,
+ )
+
+ def _build_headers_frames(self,
+ headers,
+ encoder,
+ first_frame,
+ hdr_validation_flags):
+ """
+ Helper method to build headers or push promise frames.
+ """
+ # We need to lowercase the header names, and to ensure that secure
+ # header fields are kept out of compression contexts.
+ if self.config.normalize_outbound_headers:
+ headers = normalize_outbound_headers(
+ headers, hdr_validation_flags
+ )
+ if self.config.validate_outbound_headers:
+ headers = validate_outbound_headers(
+ headers, hdr_validation_flags
+ )
+
+ encoded_headers = encoder.encode(headers)
+
+ # Slice into blocks of max_outbound_frame_size. Be careful with this:
+ # it only works right because we never send padded frames or priority
+ # information on the frames. Revisit this if we do.
+ header_blocks = [
+ encoded_headers[i:i+self.max_outbound_frame_size]
+ for i in range(
+ 0, len(encoded_headers), self.max_outbound_frame_size
+ )
+ ]
+
+ frames = []
+ first_frame.data = header_blocks[0]
+ frames.append(first_frame)
+
+ for block in header_blocks[1:]:
+ cf = ContinuationFrame(self.stream_id)
+ cf.data = block
+ frames.append(cf)
+
+ frames[-1].flags.add('END_HEADERS')
+ return frames
+
+ def _process_received_headers(self,
+ headers,
+ header_validation_flags,
+ header_encoding):
+ """
+ When headers have been received from the remote peer, run a processing
+ pipeline on them to transform them into the appropriate form for
+ attaching to an event.
+ """
+ if self.config.normalize_inbound_headers:
+ headers = normalize_inbound_headers(
+ headers, header_validation_flags
+ )
+
+ if self.config.validate_inbound_headers:
+ headers = validate_headers(headers, header_validation_flags)
+
+ if header_encoding:
+ headers = _decode_headers(headers, header_encoding)
+
+ # The above steps are all generators, so we need to concretize the
+ # headers now.
+ return list(headers)
+
+ def _initialize_content_length(self, headers):
+ """
+ Checks the headers for a content-length header and initializes the
+ _expected_content_length field from it. It's not an error for no
+ Content-Length header to be present.
+ """
+ if self.request_method == b'HEAD':
+ self._expected_content_length = 0
+ return
+
+ for n, v in headers:
+ if n == b'content-length':
+ try:
+ self._expected_content_length = int(v, 10)
+ except ValueError:
+ raise ProtocolError(
+ "Invalid content-length header: %s" % v
+ )
+
+ return
+
+ def _track_content_length(self, length, end_stream):
+ """
+ Update the expected content length in response to data being received.
+ Validates that the appropriate amount of data is sent. Always updates
+ the received data, but only validates the length against the
+ content-length header if one was sent.
+
+ :param length: The length of the body chunk received.
+ :param end_stream: If this is the last body chunk received.
+ """
+ self._actual_content_length += length
+ actual = self._actual_content_length
+ expected = self._expected_content_length
+
+ if expected is not None:
+ if expected < actual:
+ raise InvalidBodyLengthError(expected, actual)
+
+ if end_stream and expected != actual:
+ raise InvalidBodyLengthError(expected, actual)
+
+ def _inbound_flow_control_change_from_settings(self, delta):
+ """
+ We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
+ update the target window size for flow control. For our flow control
+ strategy, this means we need to do two things: we need to adjust the
+ current window size, but we also need to set the target maximum window
+ size to the new value.
+ """
+ new_max_size = self._inbound_window_manager.max_window_size + delta
+ self._inbound_window_manager.window_opened(delta)
+ self._inbound_window_manager.max_window_size = new_max_size
+
+
+def _decode_headers(headers, encoding):
+ """
+ Given an iterable of header two-tuples and an encoding, decodes those
+ headers using that encoding while preserving the type of the header tuple.
+ This ensures that the use of ``HeaderTuple`` is preserved.
+ """
+ for header in headers:
+ # This function expects to work on decoded headers, which are always
+ # HeaderTuple objects.
+ assert isinstance(header, HeaderTuple)
+
+ name, value = header
+ name = name.decode(encoding)
+ value = value.decode(encoding)
+ yield header.__class__(name, value)
diff --git a/src/h2/utilities.py b/src/h2/utilities.py
new file mode 100644
index 0000000..06c916e
--- /dev/null
+++ b/src/h2/utilities.py
@@ -0,0 +1,660 @@
+# -*- coding: utf-8 -*-
+"""
+h2/utilities
+~~~~~~~~~~~~
+
+Utility functions that do not belong in a separate module.
+"""
+import collections
+import re
+from string import whitespace
+import sys
+
+from hpack import HeaderTuple, NeverIndexedHeaderTuple
+
+from .exceptions import ProtocolError, FlowControlError
+
+UPPER_RE = re.compile(b"[A-Z]")
+
+# A set of headers that are hop-by-hop or connection-specific and thus
+# forbidden in HTTP/2. This list comes from RFC 7540 § 8.1.2.2.
+CONNECTION_HEADERS = frozenset([
+ b'connection', u'connection',
+ b'proxy-connection', u'proxy-connection',
+ b'keep-alive', u'keep-alive',
+ b'transfer-encoding', u'transfer-encoding',
+ b'upgrade', u'upgrade',
+])
+
+
+_ALLOWED_PSEUDO_HEADER_FIELDS = frozenset([
+ b':method', u':method',
+ b':scheme', u':scheme',
+ b':authority', u':authority',
+ b':path', u':path',
+ b':status', u':status',
+ b':protocol', u':protocol',
+])
+
+
+_SECURE_HEADERS = frozenset([
+ # May have basic credentials which are vulnerable to dictionary attacks.
+ b'authorization', u'authorization',
+ b'proxy-authorization', u'proxy-authorization',
+])
+
+
+_REQUEST_ONLY_HEADERS = frozenset([
+ b':scheme', u':scheme',
+ b':path', u':path',
+ b':authority', u':authority',
+ b':method', u':method',
+ b':protocol', u':protocol',
+])
+
+
+_RESPONSE_ONLY_HEADERS = frozenset([b':status', u':status'])
+
+
+# A Set of pseudo headers that are only valid if the method is
+# CONNECT, see RFC 8441 § 5
+_CONNECT_REQUEST_ONLY_HEADERS = frozenset([b':protocol', u':protocol'])
+
+
+if sys.version_info[0] == 2: # Python 2.X
+ _WHITESPACE = frozenset(whitespace)
+else: # Python 3.3+
+ _WHITESPACE = frozenset(map(ord, whitespace))
+
+
+def _secure_headers(headers, hdr_validation_flags):
+ """
+ Certain headers are at risk of being attacked during the header compression
+ phase, and so need to be kept out of header compression contexts. This
+ function automatically transforms certain specific headers into HPACK
+ never-indexed fields to ensure they don't get added to header compression
+ contexts.
+
+ This function currently implements two rules:
+
+ - 'authorization' and 'proxy-authorization' fields are automatically made
+ never-indexed.
+ - Any 'cookie' header field shorter than 20 bytes long is made
+ never-indexed.
+
+ These fields are the most at-risk. These rules are inspired by Firefox
+ and nghttp2.
+ """
+ for header in headers:
+ if header[0] in _SECURE_HEADERS:
+ yield NeverIndexedHeaderTuple(*header)
+ elif header[0] in (b'cookie', u'cookie') and len(header[1]) < 20:
+ yield NeverIndexedHeaderTuple(*header)
+ else:
+ yield header
+
+
+def extract_method_header(headers):
+ """
+ Extracts the request method from the headers list.
+ """
+ for k, v in headers:
+ if k in (b':method', u':method'):
+ if not isinstance(v, bytes):
+ return v.encode('utf-8')
+ else:
+ return v
+
+
+def is_informational_response(headers):
+ """
+ Searches a header block for a :status header to confirm that a given
+ collection of headers are an informational response. Assumes the header
+ block is well formed: that is, that the HTTP/2 special headers are first
+ in the block, and so that it can stop looking when it finds the first
+ header field whose name does not begin with a colon.
+
+ :param headers: The HTTP/2 header block.
+ :returns: A boolean indicating if this is an informational response.
+ """
+ for n, v in headers:
+ if isinstance(n, bytes):
+ sigil = b':'
+ status = b':status'
+ informational_start = b'1'
+ else:
+ sigil = u':'
+ status = u':status'
+ informational_start = u'1'
+
+ # If we find a non-special header, we're done here: stop looping.
+ if not n.startswith(sigil):
+ return False
+
+ # This isn't the status header, bail.
+ if n != status:
+ continue
+
+ # If the first digit is a 1, we've got informational headers.
+ return v.startswith(informational_start)
+
+
+def guard_increment_window(current, increment):
+ """
+ Increments a flow control window, guarding against that window becoming too
+ large.
+
+ :param current: The current value of the flow control window.
+ :param increment: The increment to apply to that window.
+ :returns: The new value of the window.
+ :raises: ``FlowControlError``
+ """
+ # The largest value the flow control window may take.
+ LARGEST_FLOW_CONTROL_WINDOW = 2**31 - 1
+
+ new_size = current + increment
+
+ if new_size > LARGEST_FLOW_CONTROL_WINDOW:
+ raise FlowControlError(
+ "May not increment flow control window past %d" %
+ LARGEST_FLOW_CONTROL_WINDOW
+ )
+
+ return new_size
+
+
+def authority_from_headers(headers):
+ """
+ Given a header set, searches for the authority header and returns the
+ value.
+
+ Note that this doesn't terminate early, so should only be called if the
+ headers are for a client request. Otherwise, will loop over the entire
+ header set, which is potentially unwise.
+
+ :param headers: The HTTP header set.
+ :returns: The value of the authority header, or ``None``.
+ :rtype: ``bytes`` or ``None``.
+ """
+ for n, v in headers:
+ # This gets run against headers that come both from HPACK and from the
+ # user, so we may have unicode floating around in here. We only want
+ # bytes.
+ if n in (b':authority', u':authority'):
+ return v.encode('utf-8') if not isinstance(v, bytes) else v
+
+ return None
+
+
+# Flags used by the validate_headers pipeline to determine which checks
+# should be applied to a given set of headers.
+HeaderValidationFlags = collections.namedtuple(
+ 'HeaderValidationFlags',
+ ['is_client', 'is_trailer', 'is_response_header', 'is_push_promise']
+)
+
+
+def validate_headers(headers, hdr_validation_flags):
+ """
+ Validates a header sequence against a set of constraints from RFC 7540.
+
+ :param headers: The HTTP header set.
+ :param hdr_validation_flags: An instance of HeaderValidationFlags.
+ """
+ # This validation logic is built on a sequence of generators that are
+ # iterated over to provide the final header list. This reduces some of the
+ # overhead of doing this checking. However, it's worth noting that this
+ # checking remains somewhat expensive, and attempts should be made wherever
+ # possible to reduce the time spent doing them.
+ #
+ # For example, we avoid tuple upacking in loops because it represents a
+ # fixed cost that we don't want to spend, instead indexing into the header
+ # tuples.
+ headers = _reject_uppercase_header_fields(
+ headers, hdr_validation_flags
+ )
+ headers = _reject_surrounding_whitespace(
+ headers, hdr_validation_flags
+ )
+ headers = _reject_te(
+ headers, hdr_validation_flags
+ )
+ headers = _reject_connection_header(
+ headers, hdr_validation_flags
+ )
+ headers = _reject_pseudo_header_fields(
+ headers, hdr_validation_flags
+ )
+ headers = _check_host_authority_header(
+ headers, hdr_validation_flags
+ )
+ headers = _check_path_header(headers, hdr_validation_flags)
+
+ return headers
+
+
+def _reject_uppercase_header_fields(headers, hdr_validation_flags):
+ """
+ Raises a ProtocolError if any uppercase character is found in a header
+ block.
+ """
+ for header in headers:
+ if UPPER_RE.search(header[0]):
+ raise ProtocolError(
+ "Received uppercase header name %s." % header[0])
+ yield header
+
+
+def _reject_surrounding_whitespace(headers, hdr_validation_flags):
+ """
+ Raises a ProtocolError if any header name or value is surrounded by
+ whitespace characters.
+ """
+ # For compatibility with RFC 7230 header fields, we need to allow the field
+ # value to be an empty string. This is ludicrous, but technically allowed.
+ # The field name may not be empty, though, so we can safely assume that it
+ # must have at least one character in it and throw exceptions if it
+ # doesn't.
+ for header in headers:
+ if header[0][0] in _WHITESPACE or header[0][-1] in _WHITESPACE:
+ raise ProtocolError(
+ "Received header name surrounded by whitespace %r" % header[0])
+ if header[1] and ((header[1][0] in _WHITESPACE) or
+ (header[1][-1] in _WHITESPACE)):
+ raise ProtocolError(
+ "Received header value surrounded by whitespace %r" % header[1]
+ )
+ yield header
+
+
+def _reject_te(headers, hdr_validation_flags):
+ """
+ Raises a ProtocolError if the TE header is present in a header block and
+ its value is anything other than "trailers".
+ """
+ for header in headers:
+ if header[0] in (b'te', u'te'):
+ if header[1].lower() not in (b'trailers', u'trailers'):
+ raise ProtocolError(
+ "Invalid value for Transfer-Encoding header: %s" %
+ header[1]
+ )
+
+ yield header
+
+
+def _reject_connection_header(headers, hdr_validation_flags):
+ """
+ Raises a ProtocolError if the Connection header is present in a header
+ block.
+ """
+ for header in headers:
+ if header[0] in CONNECTION_HEADERS:
+ raise ProtocolError(
+ "Connection-specific header field present: %s." % header[0]
+ )
+
+ yield header
+
+
+def _custom_startswith(test_string, bytes_prefix, unicode_prefix):
+ """
+ Given a string that might be a bytestring or a Unicode string,
+ return True if it starts with the appropriate prefix.
+ """
+ if isinstance(test_string, bytes):
+ return test_string.startswith(bytes_prefix)
+ else:
+ return test_string.startswith(unicode_prefix)
+
+
+def _assert_header_in_set(string_header, bytes_header, header_set):
+ """
+ Given a set of header names, checks whether the string or byte version of
+ the header name is present. Raises a Protocol error with the appropriate
+ error if it's missing.
+ """
+ if not (string_header in header_set or bytes_header in header_set):
+ raise ProtocolError(
+ "Header block missing mandatory %s header" % string_header
+ )
+
+
+def _reject_pseudo_header_fields(headers, hdr_validation_flags):
+ """
+ Raises a ProtocolError if duplicate pseudo-header fields are found in a
+ header block or if a pseudo-header field appears in a block after an
+ ordinary header field.
+
+ Raises a ProtocolError if pseudo-header fields are found in trailers.
+ """
+ seen_pseudo_header_fields = set()
+ seen_regular_header = False
+ method = None
+
+ for header in headers:
+ if _custom_startswith(header[0], b':', u':'):
+ if header[0] in seen_pseudo_header_fields:
+ raise ProtocolError(
+ "Received duplicate pseudo-header field %s" % header[0]
+ )
+
+ seen_pseudo_header_fields.add(header[0])
+
+ if seen_regular_header:
+ raise ProtocolError(
+ "Received pseudo-header field out of sequence: %s" %
+ header[0]
+ )
+
+ if header[0] not in _ALLOWED_PSEUDO_HEADER_FIELDS:
+ raise ProtocolError(
+ "Received custom pseudo-header field %s" % header[0]
+ )
+
+ if header[0] in (b':method', u':method'):
+ if not isinstance(header[1], bytes):
+ method = header[1].encode('utf-8')
+ else:
+ method = header[1]
+
+ else:
+ seen_regular_header = True
+
+ yield header
+
+ # Check the pseudo-headers we got to confirm they're acceptable.
+ _check_pseudo_header_field_acceptability(
+ seen_pseudo_header_fields, method, hdr_validation_flags
+ )
+
+
+def _check_pseudo_header_field_acceptability(pseudo_headers,
+ method,
+ hdr_validation_flags):
+ """
+ Given the set of pseudo-headers present in a header block and the
+ validation flags, confirms that RFC 7540 allows them.
+ """
+ # Pseudo-header fields MUST NOT appear in trailers - RFC 7540 § 8.1.2.1
+ if hdr_validation_flags.is_trailer and pseudo_headers:
+ raise ProtocolError(
+ "Received pseudo-header in trailer %s" % pseudo_headers
+ )
+
+ # If ':status' pseudo-header is not there in a response header, reject it.
+ # Similarly, if ':path', ':method', or ':scheme' are not there in a request
+ # header, reject it. Additionally, if a response contains any request-only
+ # headers or vice-versa, reject it.
+ # Relevant RFC section: RFC 7540 § 8.1.2.4
+ # https://tools.ietf.org/html/rfc7540#section-8.1.2.4
+ if hdr_validation_flags.is_response_header:
+ _assert_header_in_set(u':status', b':status', pseudo_headers)
+ invalid_response_headers = pseudo_headers & _REQUEST_ONLY_HEADERS
+ if invalid_response_headers:
+ raise ProtocolError(
+ "Encountered request-only headers %s" %
+ invalid_response_headers
+ )
+ elif (not hdr_validation_flags.is_response_header and
+ not hdr_validation_flags.is_trailer):
+ # This is a request, so we need to have seen :path, :method, and
+ # :scheme.
+ _assert_header_in_set(u':path', b':path', pseudo_headers)
+ _assert_header_in_set(u':method', b':method', pseudo_headers)
+ _assert_header_in_set(u':scheme', b':scheme', pseudo_headers)
+ invalid_request_headers = pseudo_headers & _RESPONSE_ONLY_HEADERS
+ if invalid_request_headers:
+ raise ProtocolError(
+ "Encountered response-only headers %s" %
+ invalid_request_headers
+ )
+ if method != b'CONNECT':
+ invalid_headers = pseudo_headers & _CONNECT_REQUEST_ONLY_HEADERS
+ if invalid_headers:
+ raise ProtocolError(
+ "Encountered connect-request-only headers %s" %
+ invalid_headers
+ )
+
+
+def _validate_host_authority_header(headers):
+ """
+ Given the :authority and Host headers from a request block that isn't
+ a trailer, check that:
+ 1. At least one of these headers is set.
+ 2. If both headers are set, they match.
+
+ :param headers: The HTTP header set.
+ :raises: ``ProtocolError``
+ """
+ # We use None as a sentinel value. Iterate over the list of headers,
+ # and record the value of these headers (if present). We don't need
+ # to worry about receiving duplicate :authority headers, as this is
+ # enforced by the _reject_pseudo_header_fields() pipeline.
+ #
+ # TODO: We should also guard against receiving duplicate Host headers,
+ # and against sending duplicate headers.
+ authority_header_val = None
+ host_header_val = None
+
+ for header in headers:
+ if header[0] in (b':authority', u':authority'):
+ authority_header_val = header[1]
+ elif header[0] in (b'host', u'host'):
+ host_header_val = header[1]
+
+ yield header
+
+ # If we have not-None values for these variables, then we know we saw
+ # the corresponding header.
+ authority_present = (authority_header_val is not None)
+ host_present = (host_header_val is not None)
+
+ # It is an error for a request header block to contain neither
+ # an :authority header nor a Host header.
+ if not authority_present and not host_present:
+ raise ProtocolError(
+ "Request header block does not have an :authority or Host header."
+ )
+
+ # If we receive both headers, they should definitely match.
+ if authority_present and host_present:
+ if authority_header_val != host_header_val:
+ raise ProtocolError(
+ "Request header block has mismatched :authority and "
+ "Host headers: %r / %r"
+ % (authority_header_val, host_header_val)
+ )
+
+
+def _check_host_authority_header(headers, hdr_validation_flags):
+ """
+ Raises a ProtocolError if a header block arrives that does not contain an
+ :authority or a Host header, or if a header block contains both fields,
+ but their values do not match.
+ """
+ # We only expect to see :authority and Host headers on request header
+ # blocks that aren't trailers, so skip this validation if this is a
+ # response header or we're looking at trailer blocks.
+ skip_validation = (
+ hdr_validation_flags.is_response_header or
+ hdr_validation_flags.is_trailer
+ )
+ if skip_validation:
+ return headers
+
+ return _validate_host_authority_header(headers)
+
+
+def _check_path_header(headers, hdr_validation_flags):
+ """
+ Raise a ProtocolError if a header block arrives or is sent that contains an
+ empty :path header.
+ """
+ def inner():
+ for header in headers:
+ if header[0] in (b':path', u':path'):
+ if not header[1]:
+ raise ProtocolError("An empty :path header is forbidden")
+
+ yield header
+
+ # We only expect to see :authority and Host headers on request header
+ # blocks that aren't trailers, so skip this validation if this is a
+ # response header or we're looking at trailer blocks.
+ skip_validation = (
+ hdr_validation_flags.is_response_header or
+ hdr_validation_flags.is_trailer
+ )
+ if skip_validation:
+ return headers
+ else:
+ return inner()
+
+
+def _lowercase_header_names(headers, hdr_validation_flags):
+ """
+ Given an iterable of header two-tuples, rebuilds that iterable with the
+ header names lowercased. This generator produces tuples that preserve the
+ original type of the header tuple for tuple and any ``HeaderTuple``.
+ """
+ for header in headers:
+ if isinstance(header, HeaderTuple):
+ yield header.__class__(header[0].lower(), header[1])
+ else:
+ yield (header[0].lower(), header[1])
+
+
+def _strip_surrounding_whitespace(headers, hdr_validation_flags):
+ """
+ Given an iterable of header two-tuples, strip both leading and trailing
+ whitespace from both header names and header values. This generator
+ produces tuples that preserve the original type of the header tuple for
+ tuple and any ``HeaderTuple``.
+ """
+ for header in headers:
+ if isinstance(header, HeaderTuple):
+ yield header.__class__(header[0].strip(), header[1].strip())
+ else:
+ yield (header[0].strip(), header[1].strip())
+
+
+def _strip_connection_headers(headers, hdr_validation_flags):
+ """
+ Strip any connection headers as per RFC7540 § 8.1.2.2.
+ """
+ for header in headers:
+ if header[0] not in CONNECTION_HEADERS:
+ yield header
+
+
+def _check_sent_host_authority_header(headers, hdr_validation_flags):
+ """
+ Raises an InvalidHeaderBlockError if we try to send a header block
+ that does not contain an :authority or a Host header, or if
+ the header block contains both fields, but their values do not match.
+ """
+ # We only expect to see :authority and Host headers on request header
+ # blocks that aren't trailers, so skip this validation if this is a
+ # response header or we're looking at trailer blocks.
+ skip_validation = (
+ hdr_validation_flags.is_response_header or
+ hdr_validation_flags.is_trailer
+ )
+ if skip_validation:
+ return headers
+
+ return _validate_host_authority_header(headers)
+
+
+def _combine_cookie_fields(headers, hdr_validation_flags):
+ """
+ RFC 7540 § 8.1.2.5 allows HTTP/2 clients to split the Cookie header field,
+ which must normally appear only once, into multiple fields for better
+ compression. However, they MUST be joined back up again when received.
+ This normalization step applies that transform. The side-effect is that
+ all cookie fields now appear *last* in the header block.
+ """
+ # There is a problem here about header indexing. Specifically, it's
+ # possible that all these cookies are sent with different header indexing
+ # values. At this point it shouldn't matter too much, so we apply our own
+ # logic and make them never-indexed.
+ cookies = []
+ for header in headers:
+ if header[0] == b'cookie':
+ cookies.append(header[1])
+ else:
+ yield header
+ if cookies:
+ cookie_val = b'; '.join(cookies)
+ yield NeverIndexedHeaderTuple(b'cookie', cookie_val)
+
+
+def normalize_outbound_headers(headers, hdr_validation_flags):
+ """
+ Normalizes a header sequence that we are about to send.
+
+ :param headers: The HTTP header set.
+ :param hdr_validation_flags: An instance of HeaderValidationFlags.
+ """
+ headers = _lowercase_header_names(headers, hdr_validation_flags)
+ headers = _strip_surrounding_whitespace(headers, hdr_validation_flags)
+ headers = _strip_connection_headers(headers, hdr_validation_flags)
+ headers = _secure_headers(headers, hdr_validation_flags)
+
+ return headers
+
+
+def normalize_inbound_headers(headers, hdr_validation_flags):
+ """
+ Normalizes a header sequence that we have received.
+
+ :param headers: The HTTP header set.
+ :param hdr_validation_flags: An instance of HeaderValidationFlags
+ """
+ headers = _combine_cookie_fields(headers, hdr_validation_flags)
+ return headers
+
+
+def validate_outbound_headers(headers, hdr_validation_flags):
+ """
+ Validates and normalizes a header sequence that we are about to send.
+
+ :param headers: The HTTP header set.
+ :param hdr_validation_flags: An instance of HeaderValidationFlags.
+ """
+ headers = _reject_te(
+ headers, hdr_validation_flags
+ )
+ headers = _reject_connection_header(
+ headers, hdr_validation_flags
+ )
+ headers = _reject_pseudo_header_fields(
+ headers, hdr_validation_flags
+ )
+ headers = _check_sent_host_authority_header(
+ headers, hdr_validation_flags
+ )
+ headers = _check_path_header(headers, hdr_validation_flags)
+
+ return headers
+
+
+class SizeLimitDict(collections.OrderedDict):
+
+ def __init__(self, *args, **kwargs):
+ self._size_limit = kwargs.pop("size_limit", None)
+ super(SizeLimitDict, self).__init__(*args, **kwargs)
+
+ self._check_size_limit()
+
+ def __setitem__(self, key, value):
+ super(SizeLimitDict, self).__setitem__(key, value)
+
+ self._check_size_limit()
+
+ def _check_size_limit(self):
+ if self._size_limit is not None:
+ while len(self) > self._size_limit:
+ self.popitem(last=False)
diff --git a/src/h2/windows.py b/src/h2/windows.py
new file mode 100644
index 0000000..6656975
--- /dev/null
+++ b/src/h2/windows.py
@@ -0,0 +1,139 @@
+# -*- coding: utf-8 -*-
+"""
+h2/windows
+~~~~~~~~~~
+
+Defines tools for managing HTTP/2 flow control windows.
+
+The objects defined in this module are used to automatically manage HTTP/2
+flow control windows. Specifically, they keep track of what the size of the
+window is, how much data has been consumed from that window, and how much data
+the user has already used. It then implements a basic algorithm that attempts
+to manage the flow control window without user input, trying to ensure that it
+does not emit too many WINDOW_UPDATE frames.
+"""
+from __future__ import division
+
+from .exceptions import FlowControlError
+
+
+# The largest acceptable value for a HTTP/2 flow control window.
+LARGEST_FLOW_CONTROL_WINDOW = 2**31 - 1
+
+
+class WindowManager(object):
+ """
+ A basic HTTP/2 window manager.
+
+ :param max_window_size: The maximum size of the flow control window.
+ :type max_window_size: ``int``
+ """
+ def __init__(self, max_window_size):
+ assert max_window_size <= LARGEST_FLOW_CONTROL_WINDOW
+ self.max_window_size = max_window_size
+ self.current_window_size = max_window_size
+ self._bytes_processed = 0
+
+ def window_consumed(self, size):
+ """
+ We have received a certain number of bytes from the remote peer. This
+ necessarily shrinks the flow control window!
+
+ :param size: The number of flow controlled bytes we received from the
+ remote peer.
+ :type size: ``int``
+ :returns: Nothing.
+ :rtype: ``None``
+ """
+ self.current_window_size -= size
+ if self.current_window_size < 0:
+ raise FlowControlError("Flow control window shrunk below 0")
+
+ def window_opened(self, size):
+ """
+ The flow control window has been incremented, either because of manual
+ flow control management or because of the user changing the flow
+ control settings. This can have the effect of increasing what we
+ consider to be the "maximum" flow control window size.
+
+ This does not increase our view of how many bytes have been processed,
+ only of how much space is in the window.
+
+ :param size: The increment to the flow control window we received.
+ :type size: ``int``
+ :returns: Nothing
+ :rtype: ``None``
+ """
+ self.current_window_size += size
+
+ if self.current_window_size > LARGEST_FLOW_CONTROL_WINDOW:
+ raise FlowControlError(
+ "Flow control window mustn't exceed %d" %
+ LARGEST_FLOW_CONTROL_WINDOW
+ )
+
+ if self.current_window_size > self.max_window_size:
+ self.max_window_size = self.current_window_size
+
+ def process_bytes(self, size):
+ """
+ The application has informed us that it has processed a certain number
+ of bytes. This may cause us to want to emit a window update frame. If
+ we do want to emit a window update frame, this method will return the
+ number of bytes that we should increment the window by.
+
+ :param size: The number of flow controlled bytes that the application
+ has processed.
+ :type size: ``int``
+ :returns: The number of bytes to increment the flow control window by,
+ or ``None``.
+ :rtype: ``int`` or ``None``
+ """
+ self._bytes_processed += size
+ return self._maybe_update_window()
+
+ def _maybe_update_window(self):
+ """
+ Run the algorithm.
+
+ Our current algorithm can be described like this.
+
+ 1. If no bytes have been processed, we immediately return 0. There is
+ no meaningful way for us to hand space in the window back to the
+ remote peer, so let's not even try.
+ 2. If there is no space in the flow control window, and we have
+ processed at least 1024 bytes (or 1/4 of the window, if the window
+ is smaller), we will emit a window update frame. This is to avoid
+ the risk of blocking a stream altogether.
+ 3. If there is space in the flow control window, and we have processed
+ at least 1/2 of the window worth of bytes, we will emit a window
+ update frame. This is to minimise the number of window update frames
+ we have to emit.
+
+ In a healthy system with large flow control windows, this will
+ irregularly emit WINDOW_UPDATE frames. This prevents us starving the
+ connection by emitting eleventy bajillion WINDOW_UPDATE frames,
+ especially in situations where the remote peer is sending a lot of very
+ small DATA frames.
+ """
+ # TODO: Can the window be smaller than 1024 bytes? If not, we can
+ # streamline this algorithm.
+ if not self._bytes_processed:
+ return None
+
+ max_increment = (self.max_window_size - self.current_window_size)
+ increment = 0
+
+ # Note that, even though we may increment less than _bytes_processed,
+ # we still want to set it to zero whenever we emit an increment. This
+ # is because we'll always increment up to the maximum we can.
+ if (self.current_window_size == 0) and (
+ self._bytes_processed > min(1024, self.max_window_size // 4)):
+ increment = min(self._bytes_processed, max_increment)
+ self._bytes_processed = 0
+ elif self._bytes_processed >= (self.max_window_size // 2):
+ increment = min(self._bytes_processed, max_increment)
+ self._bytes_processed = 0
+
+ self.current_window_size += increment
+ return increment