diff options
author | Thomas Kriechbaumer <thomas@kriechbaumer.name> | 2020-08-01 12:44:05 +0200 |
---|---|---|
committer | Thomas Kriechbaumer <Kriechi@users.noreply.github.com> | 2020-08-02 13:34:58 +0200 |
commit | e3ab38a6440a3544431c92a9fb3e3e1307ef3f8c (patch) | |
tree | d0444c19825830ee6a05c08ad39e215f58f1c4f2 /src | |
parent | 32f868d4a0b5e97243f86c3f90edf4b2932a8fb5 (diff) |
move all code under src/
Diffstat (limited to 'src')
-rw-r--r-- | src/h2/__init__.py | 8 | ||||
-rw-r--r-- | src/h2/config.py | 170 | ||||
-rw-r--r-- | src/h2/connection.py | 2051 | ||||
-rw-r--r-- | src/h2/errors.py | 75 | ||||
-rw-r--r-- | src/h2/events.py | 648 | ||||
-rw-r--r-- | src/h2/exceptions.py | 186 | ||||
-rw-r--r-- | src/h2/frame_buffer.py | 175 | ||||
-rw-r--r-- | src/h2/settings.py | 339 | ||||
-rw-r--r-- | src/h2/stream.py | 1371 | ||||
-rw-r--r-- | src/h2/utilities.py | 660 | ||||
-rw-r--r-- | src/h2/windows.py | 139 |
11 files changed, 5822 insertions, 0 deletions
diff --git a/src/h2/__init__.py b/src/h2/__init__.py new file mode 100644 index 0000000..17ee91b --- /dev/null +++ b/src/h2/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +""" +hyper-h2 +~~ + +A HTTP/2 implementation. +""" +__version__ = '4.0.0dev0' diff --git a/src/h2/config.py b/src/h2/config.py new file mode 100644 index 0000000..1c437ee --- /dev/null +++ b/src/h2/config.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +""" +h2/config +~~~~~~~~~ + +Objects for controlling the configuration of the HTTP/2 stack. +""" + + +class _BooleanConfigOption(object): + """ + Descriptor for handling a boolean config option. This will block + attempts to set boolean config options to non-bools. + """ + def __init__(self, name): + self.name = name + self.attr_name = '_%s' % self.name + + def __get__(self, instance, owner): + return getattr(instance, self.attr_name) + + def __set__(self, instance, value): + if not isinstance(value, bool): + raise ValueError("%s must be a bool" % self.name) + setattr(instance, self.attr_name, value) + + +class DummyLogger(object): + """ + An Logger object that does not actual logging, hence a DummyLogger. + + For the class the log operation is merely a no-op. The intent is to avoid + conditionals being sprinkled throughout the hyper-h2 code for calls to + logging functions when no logger is passed into the corresponding object. + """ + def __init__(self, *vargs): + pass + + def debug(self, *vargs, **kwargs): + """ + No-op logging. Only level needed for now. + """ + pass + + def trace(self, *vargs, **kwargs): + """ + No-op logging. Only level needed for now. + """ + pass + + +class H2Configuration(object): + """ + An object that controls the way a single HTTP/2 connection behaves. + + This object allows the users to customize behaviour. In particular, it + allows users to enable or disable optional features, or to otherwise handle + various unusual behaviours. + + This object has very little behaviour of its own: it mostly just ensures + that configuration is self-consistent. + + :param client_side: Whether this object is to be used on the client side of + a connection, or on the server side. Affects the logic used by the + state machine, the default settings values, the allowable stream IDs, + and several other properties. Defaults to ``True``. + :type client_side: ``bool`` + + :param header_encoding: Controls whether the headers emitted by this object + in events are transparently decoded to ``unicode`` strings, and what + encoding is used to do that decoding. This defaults to ``None``, + meaning that headers will be returned as bytes. To automatically + decode headers (that is, to return them as unicode strings), this can + be set to the string name of any encoding, e.g. ``'utf-8'``. + + .. versionchanged:: 3.0.0 + Changed default value from ``'utf-8'`` to ``None`` + + :type header_encoding: ``str``, ``False``, or ``None`` + + :param validate_outbound_headers: Controls whether the headers emitted + by this object are validated against the rules in RFC 7540. + Disabling this setting will cause outbound header validation to + be skipped, and allow the object to emit headers that may be illegal + according to RFC 7540. Defaults to ``True``. + :type validate_outbound_headers: ``bool`` + + :param normalize_outbound_headers: Controls whether the headers emitted + by this object are normalized before sending. Disabling this setting + will cause outbound header normalization to be skipped, and allow + the object to emit headers that may be illegal according to + RFC 7540. Defaults to ``True``. + :type normalize_outbound_headers: ``bool`` + + :param validate_inbound_headers: Controls whether the headers received + by this object are validated against the rules in RFC 7540. + Disabling this setting will cause inbound header validation to + be skipped, and allow the object to receive headers that may be illegal + according to RFC 7540. Defaults to ``True``. + :type validate_inbound_headers: ``bool`` + + :param normalize_inbound_headers: Controls whether the headers received by + this object are normalized according to the rules of RFC 7540. + Disabling this setting may lead to hyper-h2 emitting header blocks that + some RFCs forbid, e.g. with multiple cookie fields. + + .. versionadded:: 3.0.0 + + :type normalize_inbound_headers: ``bool`` + + :param logger: A logger that conforms to the requirements for this module, + those being no I/O and no context switches, which is needed in order + to run in asynchronous operation. + + .. versionadded:: 2.6.0 + + :type logger: ``logging.Logger`` + """ + client_side = _BooleanConfigOption('client_side') + validate_outbound_headers = _BooleanConfigOption( + 'validate_outbound_headers' + ) + normalize_outbound_headers = _BooleanConfigOption( + 'normalize_outbound_headers' + ) + validate_inbound_headers = _BooleanConfigOption( + 'validate_inbound_headers' + ) + normalize_inbound_headers = _BooleanConfigOption( + 'normalize_inbound_headers' + ) + + def __init__(self, + client_side=True, + header_encoding=None, + validate_outbound_headers=True, + normalize_outbound_headers=True, + validate_inbound_headers=True, + normalize_inbound_headers=True, + logger=None): + self.client_side = client_side + self.header_encoding = header_encoding + self.validate_outbound_headers = validate_outbound_headers + self.normalize_outbound_headers = normalize_outbound_headers + self.validate_inbound_headers = validate_inbound_headers + self.normalize_inbound_headers = normalize_inbound_headers + self.logger = logger or DummyLogger(__name__) + + @property + def header_encoding(self): + """ + Controls whether the headers emitted by this object in events are + transparently decoded to ``unicode`` strings, and what encoding is used + to do that decoding. This defaults to ``None``, meaning that headers + will be returned as bytes. To automatically decode headers (that is, to + return them as unicode strings), this can be set to the string name of + any encoding, e.g. ``'utf-8'``. + """ + return self._header_encoding + + @header_encoding.setter + def header_encoding(self, value): + """ + Enforces constraints on the value of header encoding. + """ + if not isinstance(value, (bool, str, type(None))): + raise ValueError("header_encoding must be bool, string, or None") + if value is True: + raise ValueError("header_encoding cannot be True") + self._header_encoding = value diff --git a/src/h2/connection.py b/src/h2/connection.py new file mode 100644 index 0000000..881304b --- /dev/null +++ b/src/h2/connection.py @@ -0,0 +1,2051 @@ +# -*- coding: utf-8 -*- +""" +h2/connection +~~~~~~~~~~~~~ + +An implementation of a HTTP/2 connection. +""" +import base64 + +from enum import Enum, IntEnum + +from hyperframe.exceptions import InvalidPaddingError +from hyperframe.frame import ( + GoAwayFrame, WindowUpdateFrame, HeadersFrame, DataFrame, PingFrame, + PushPromiseFrame, SettingsFrame, RstStreamFrame, PriorityFrame, + ContinuationFrame, AltSvcFrame, ExtensionFrame +) +from hpack.hpack import Encoder, Decoder +from hpack.exceptions import HPACKError, OversizedHeaderListError + +from .config import H2Configuration +from .errors import ErrorCodes, _error_code_from_int +from .events import ( + WindowUpdated, RemoteSettingsChanged, PingReceived, PingAckReceived, + SettingsAcknowledged, ConnectionTerminated, PriorityUpdated, + AlternativeServiceAvailable, UnknownFrameReceived +) +from .exceptions import ( + ProtocolError, NoSuchStreamError, FlowControlError, FrameTooLargeError, + TooManyStreamsError, StreamClosedError, StreamIDTooLowError, + NoAvailableStreamIDError, RFC1122Error, DenialOfServiceError +) +from .frame_buffer import FrameBuffer +from .settings import Settings, SettingCodes +from .stream import H2Stream, StreamClosedBy +from .utilities import SizeLimitDict, guard_increment_window +from .windows import WindowManager + + +class ConnectionState(Enum): + IDLE = 0 + CLIENT_OPEN = 1 + SERVER_OPEN = 2 + CLOSED = 3 + + +class ConnectionInputs(Enum): + SEND_HEADERS = 0 + SEND_PUSH_PROMISE = 1 + SEND_DATA = 2 + SEND_GOAWAY = 3 + SEND_WINDOW_UPDATE = 4 + SEND_PING = 5 + SEND_SETTINGS = 6 + SEND_RST_STREAM = 7 + SEND_PRIORITY = 8 + RECV_HEADERS = 9 + RECV_PUSH_PROMISE = 10 + RECV_DATA = 11 + RECV_GOAWAY = 12 + RECV_WINDOW_UPDATE = 13 + RECV_PING = 14 + RECV_SETTINGS = 15 + RECV_RST_STREAM = 16 + RECV_PRIORITY = 17 + SEND_ALTERNATIVE_SERVICE = 18 # Added in 2.3.0 + RECV_ALTERNATIVE_SERVICE = 19 # Added in 2.3.0 + + +class AllowedStreamIDs(IntEnum): + EVEN = 0 + ODD = 1 + + +class H2ConnectionStateMachine(object): + """ + A single HTTP/2 connection state machine. + + This state machine, while defined in its own class, is logically part of + the H2Connection class also defined in this file. The state machine itself + maintains very little state directly, instead focusing entirely on managing + state transitions. + """ + # For the purposes of this state machine we treat HEADERS and their + # associated CONTINUATION frames as a single jumbo frame. The protocol + # allows/requires this by preventing other frames from being interleved in + # between HEADERS/CONTINUATION frames. + # + # The _transitions dictionary contains a mapping of tuples of + # (state, input) to tuples of (side_effect_function, end_state). This map + # contains all allowed transitions: anything not in this map is invalid + # and immediately causes a transition to ``closed``. + + _transitions = { + # State: idle + (ConnectionState.IDLE, ConnectionInputs.SEND_HEADERS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.IDLE, ConnectionInputs.RECV_HEADERS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.IDLE, ConnectionInputs.SEND_SETTINGS): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.RECV_SETTINGS): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.SEND_WINDOW_UPDATE): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.RECV_WINDOW_UPDATE): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.SEND_PING): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.RECV_PING): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.SEND_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.IDLE, ConnectionInputs.RECV_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.IDLE, ConnectionInputs.SEND_PRIORITY): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.RECV_PRIORITY): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.SEND_ALTERNATIVE_SERVICE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.IDLE, ConnectionInputs.RECV_ALTERNATIVE_SERVICE): + (None, ConnectionState.CLIENT_OPEN), + + # State: open, client side. + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_HEADERS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_DATA): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PING): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_SETTINGS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PRIORITY): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_HEADERS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PUSH_PROMISE): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_DATA): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PING): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_SETTINGS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_RST_STREAM): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_RST_STREAM): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PRIORITY): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, + ConnectionInputs.RECV_ALTERNATIVE_SERVICE): + (None, ConnectionState.CLIENT_OPEN), + + # State: open, server side. + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_HEADERS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PUSH_PROMISE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_DATA): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PING): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_SETTINGS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PRIORITY): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_HEADERS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_DATA): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PING): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_SETTINGS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PRIORITY): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_RST_STREAM): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_RST_STREAM): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, + ConnectionInputs.SEND_ALTERNATIVE_SERVICE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, + ConnectionInputs.RECV_ALTERNATIVE_SERVICE): + (None, ConnectionState.SERVER_OPEN), + + # State: closed + (ConnectionState.CLOSED, ConnectionInputs.SEND_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.CLOSED, ConnectionInputs.RECV_GOAWAY): + (None, ConnectionState.CLOSED), + } + + def __init__(self): + self.state = ConnectionState.IDLE + + def process_input(self, input_): + """ + Process a specific input in the state machine. + """ + if not isinstance(input_, ConnectionInputs): + raise ValueError("Input must be an instance of ConnectionInputs") + + try: + func, target_state = self._transitions[(self.state, input_)] + except KeyError: + old_state = self.state + self.state = ConnectionState.CLOSED + raise ProtocolError( + "Invalid input %s in state %s" % (input_, old_state) + ) + else: + self.state = target_state + if func is not None: # pragma: no cover + return func() + + return [] + + +class H2Connection(object): + """ + A low-level HTTP/2 connection object. This handles building and receiving + frames and maintains both connection and per-stream state for all streams + on this connection. + + This wraps a HTTP/2 Connection state machine implementation, ensuring that + frames can only be sent/received when the connection is in a valid state. + It also builds stream state machines on demand to ensure that the + constraints of those state machines are met as well. Attempts to create + frames that cannot be sent will raise a ``ProtocolError``. + + .. versionchanged:: 2.3.0 + Added the ``header_encoding`` keyword argument. + + .. versionchanged:: 2.5.0 + Added the ``config`` keyword argument. Deprecated the ``client_side`` + and ``header_encoding`` parameters. + + .. versionchanged:: 3.0.0 + Removed deprecated parameters and properties. + + :param config: The configuration for the HTTP/2 connection. + + .. versionadded:: 2.5.0 + + :type config: :class:`H2Configuration <h2.config.H2Configuration>` + """ + # The initial maximum outbound frame size. This can be changed by receiving + # a settings frame. + DEFAULT_MAX_OUTBOUND_FRAME_SIZE = 65535 + + # The initial maximum inbound frame size. This is somewhat arbitrarily + # chosen. + DEFAULT_MAX_INBOUND_FRAME_SIZE = 2**24 + + # The highest acceptable stream ID. + HIGHEST_ALLOWED_STREAM_ID = 2**31 - 1 + + # The largest acceptable window increment. + MAX_WINDOW_INCREMENT = 2**31 - 1 + + # The initial default value of SETTINGS_MAX_HEADER_LIST_SIZE. + DEFAULT_MAX_HEADER_LIST_SIZE = 2**16 + + # Keep in memory limited amount of results for streams closes + MAX_CLOSED_STREAMS = 2**16 + + def __init__(self, config=None): + self.state_machine = H2ConnectionStateMachine() + self.streams = {} + self.highest_inbound_stream_id = 0 + self.highest_outbound_stream_id = 0 + self.encoder = Encoder() + self.decoder = Decoder() + + # This won't always actually do anything: for versions of HPACK older + # than 2.3.0 it does nothing. However, we have to try! + self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE + + #: The configuration for this HTTP/2 connection object. + #: + #: .. versionadded:: 2.5.0 + self.config = config + if self.config is None: + self.config = H2Configuration( + client_side=True, + ) + + # Objects that store settings, including defaults. + # + # We set the MAX_CONCURRENT_STREAMS value to 100 because its default is + # unbounded, and that's a dangerous default because it allows + # essentially unbounded resources to be allocated regardless of how + # they will be used. 100 should be suitable for the average + # application. This default obviously does not apply to the remote + # peer's settings: the remote peer controls them! + # + # We also set MAX_HEADER_LIST_SIZE to a reasonable value. This is to + # advertise our defence against CVE-2016-6581. However, not all + # versions of HPACK will let us do it. That's ok: we should at least + # suggest that we're not vulnerable. + self.local_settings = Settings( + client=self.config.client_side, + initial_values={ + SettingCodes.MAX_CONCURRENT_STREAMS: 100, + SettingCodes.MAX_HEADER_LIST_SIZE: + self.DEFAULT_MAX_HEADER_LIST_SIZE, + } + ) + self.remote_settings = Settings(client=not self.config.client_side) + + # The current value of the connection flow control windows on the + # connection. + self.outbound_flow_control_window = ( + self.remote_settings.initial_window_size + ) + + #: The maximum size of a frame that can be emitted by this peer, in + #: bytes. + self.max_outbound_frame_size = self.remote_settings.max_frame_size + + #: The maximum size of a frame that can be received by this peer, in + #: bytes. + self.max_inbound_frame_size = self.local_settings.max_frame_size + + # Buffer for incoming data. + self.incoming_buffer = FrameBuffer(server=not self.config.client_side) + + # A private variable to store a sequence of received header frames + # until completion. + self._header_frames = [] + + # Data that needs to be sent. + self._data_to_send = bytearray() + + # Keeps track of how streams are closed. + # Used to ensure that we don't blow up in the face of frames that were + # in flight when a RST_STREAM was sent. + # Also used to determine whether we should consider a frame received + # while a stream is closed as either a stream error or a connection + # error. + self._closed_streams = SizeLimitDict( + size_limit=self.MAX_CLOSED_STREAMS + ) + + # The flow control window manager for the connection. + self._inbound_flow_control_window_manager = WindowManager( + max_window_size=self.local_settings.initial_window_size + ) + + # When in doubt use dict-dispatch. + self._frame_dispatch_table = { + HeadersFrame: self._receive_headers_frame, + PushPromiseFrame: self._receive_push_promise_frame, + SettingsFrame: self._receive_settings_frame, + DataFrame: self._receive_data_frame, + WindowUpdateFrame: self._receive_window_update_frame, + PingFrame: self._receive_ping_frame, + RstStreamFrame: self._receive_rst_stream_frame, + PriorityFrame: self._receive_priority_frame, + GoAwayFrame: self._receive_goaway_frame, + ContinuationFrame: self._receive_naked_continuation, + AltSvcFrame: self._receive_alt_svc_frame, + ExtensionFrame: self._receive_unknown_frame + } + + def _prepare_for_sending(self, frames): + if not frames: + return + self._data_to_send += b''.join(f.serialize() for f in frames) + assert all(f.body_len <= self.max_outbound_frame_size for f in frames) + + def _open_streams(self, remainder): + """ + A common method of counting number of open streams. Returns the number + of streams that are open *and* that have (stream ID % 2) == remainder. + While it iterates, also deletes any closed streams. + """ + count = 0 + to_delete = [] + + for stream_id, stream in self.streams.items(): + if stream.open and (stream_id % 2 == remainder): + count += 1 + elif stream.closed: + to_delete.append(stream_id) + + for stream_id in to_delete: + stream = self.streams.pop(stream_id) + self._closed_streams[stream_id] = stream.closed_by + + return count + + @property + def open_outbound_streams(self): + """ + The current number of open outbound streams. + """ + outbound_numbers = int(self.config.client_side) + return self._open_streams(outbound_numbers) + + @property + def open_inbound_streams(self): + """ + The current number of open inbound streams. + """ + inbound_numbers = int(not self.config.client_side) + return self._open_streams(inbound_numbers) + + @property + def inbound_flow_control_window(self): + """ + The size of the inbound flow control window for the connection. This is + rarely publicly useful: instead, use :meth:`remote_flow_control_window + <h2.connection.H2Connection.remote_flow_control_window>`. This + shortcut is largely present to provide a shortcut to this data. + """ + return self._inbound_flow_control_window_manager.current_window_size + + def _begin_new_stream(self, stream_id, allowed_ids): + """ + Initiate a new stream. + + .. versionchanged:: 2.0.0 + Removed this function from the public API. + + :param stream_id: The ID of the stream to open. + :param allowed_ids: What kind of stream ID is allowed. + """ + self.config.logger.debug( + "Attempting to initiate stream ID %d", stream_id + ) + outbound = self._stream_id_is_outbound(stream_id) + highest_stream_id = ( + self.highest_outbound_stream_id if outbound else + self.highest_inbound_stream_id + ) + + if stream_id <= highest_stream_id: + raise StreamIDTooLowError(stream_id, highest_stream_id) + + if (stream_id % 2) != int(allowed_ids): + raise ProtocolError( + "Invalid stream ID for peer." + ) + + s = H2Stream( + stream_id, + config=self.config, + inbound_window_size=self.local_settings.initial_window_size, + outbound_window_size=self.remote_settings.initial_window_size + ) + self.config.logger.debug("Stream ID %d created", stream_id) + s.max_inbound_frame_size = self.max_inbound_frame_size + s.max_outbound_frame_size = self.max_outbound_frame_size + + self.streams[stream_id] = s + self.config.logger.debug("Current streams: %s", self.streams.keys()) + + if outbound: + self.highest_outbound_stream_id = stream_id + else: + self.highest_inbound_stream_id = stream_id + + return s + + def initiate_connection(self): + """ + Provides any data that needs to be sent at the start of the connection. + Must be called for both clients and servers. + """ + self.config.logger.debug("Initializing connection") + self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) + if self.config.client_side: + preamble = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n' + else: + preamble = b'' + + f = SettingsFrame(0) + for setting, value in self.local_settings.items(): + f.settings[setting] = value + self.config.logger.debug( + "Send Settings frame: %s", self.local_settings + ) + + self._data_to_send += preamble + f.serialize() + + def initiate_upgrade_connection(self, settings_header=None): + """ + Call to initialise the connection object for use with an upgraded + HTTP/2 connection (i.e. a connection negotiated using the + ``Upgrade: h2c`` HTTP header). + + This method differs from :meth:`initiate_connection + <h2.connection.H2Connection.initiate_connection>` in several ways. + Firstly, it handles the additional SETTINGS frame that is sent in the + ``HTTP2-Settings`` header field. When called on a client connection, + this method will return a bytestring that the caller can put in the + ``HTTP2-Settings`` field they send on their initial request. When + called on a server connection, the user **must** provide the value they + received from the client in the ``HTTP2-Settings`` header field to the + ``settings_header`` argument, which will be used appropriately. + + Additionally, this method sets up stream 1 in a half-closed state + appropriate for this side of the connection, to reflect the fact that + the request is already complete. + + Finally, this method also prepares the appropriate preamble to be sent + after the upgrade. + + .. versionadded:: 2.3.0 + + :param settings_header: (optional, server-only): The value of the + ``HTTP2-Settings`` header field received from the client. + :type settings_header: ``bytes`` + + :returns: For clients, a bytestring to put in the ``HTTP2-Settings``. + For servers, returns nothing. + :rtype: ``bytes`` or ``None`` + """ + self.config.logger.debug( + "Upgrade connection. Current settings: %s", self.local_settings + ) + + frame_data = None + # Begin by getting the preamble in place. + self.initiate_connection() + + if self.config.client_side: + f = SettingsFrame(0) + for setting, value in self.local_settings.items(): + f.settings[setting] = value + + frame_data = f.serialize_body() + frame_data = base64.urlsafe_b64encode(frame_data) + elif settings_header: + # We have a settings header from the client. This needs to be + # applied, but we want to throw away the ACK. We do this by + # inserting the data into a Settings frame and then passing it to + # the state machine, but ignoring the return value. + settings_header = base64.urlsafe_b64decode(settings_header) + f = SettingsFrame(0) + f.parse_body(settings_header) + self._receive_settings_frame(f) + + # Set up appropriate state. Stream 1 in a half-closed state: + # half-closed(local) for clients, half-closed(remote) for servers. + # Additionally, we need to set up the Connection state machine. + connection_input = ( + ConnectionInputs.SEND_HEADERS if self.config.client_side + else ConnectionInputs.RECV_HEADERS + ) + self.config.logger.debug("Process input %s", connection_input) + self.state_machine.process_input(connection_input) + + # Set up stream 1. + self._begin_new_stream(stream_id=1, allowed_ids=AllowedStreamIDs.ODD) + self.streams[1].upgrade(self.config.client_side) + return frame_data + + def _get_or_create_stream(self, stream_id, allowed_ids): + """ + Gets a stream by its stream ID. Will create one if one does not already + exist. Use allowed_ids to circumvent the usual stream ID rules for + clients and servers. + + .. versionchanged:: 2.0.0 + Removed this function from the public API. + """ + try: + return self.streams[stream_id] + except KeyError: + return self._begin_new_stream(stream_id, allowed_ids) + + def _get_stream_by_id(self, stream_id): + """ + Gets a stream by its stream ID. Raises NoSuchStreamError if the stream + ID does not correspond to a known stream and is higher than the current + maximum: raises if it is lower than the current maximum. + + .. versionchanged:: 2.0.0 + Removed this function from the public API. + """ + try: + return self.streams[stream_id] + except KeyError: + outbound = self._stream_id_is_outbound(stream_id) + highest_stream_id = ( + self.highest_outbound_stream_id if outbound else + self.highest_inbound_stream_id + ) + + if stream_id > highest_stream_id: + raise NoSuchStreamError(stream_id) + else: + raise StreamClosedError(stream_id) + + def get_next_available_stream_id(self): + """ + Returns an integer suitable for use as the stream ID for the next + stream created by this endpoint. For server endpoints, this stream ID + will be even. For client endpoints, this stream ID will be odd. If no + stream IDs are available, raises :class:`NoAvailableStreamIDError + <h2.exceptions.NoAvailableStreamIDError>`. + + .. warning:: The return value from this function does not change until + the stream ID has actually been used by sending or pushing + headers on that stream. For that reason, it should be + called as close as possible to the actual use of the + stream ID. + + .. versionadded:: 2.0.0 + + :raises: :class:`NoAvailableStreamIDError + <h2.exceptions.NoAvailableStreamIDError>` + :returns: The next free stream ID this peer can use to initiate a + stream. + :rtype: ``int`` + """ + # No streams have been opened yet, so return the lowest allowed stream + # ID. + if not self.highest_outbound_stream_id: + next_stream_id = 1 if self.config.client_side else 2 + else: + next_stream_id = self.highest_outbound_stream_id + 2 + self.config.logger.debug( + "Next available stream ID %d", next_stream_id + ) + if next_stream_id > self.HIGHEST_ALLOWED_STREAM_ID: + raise NoAvailableStreamIDError("Exhausted allowed stream IDs") + + return next_stream_id + + def send_headers(self, stream_id, headers, end_stream=False, + priority_weight=None, priority_depends_on=None, + priority_exclusive=None): + """ + Send headers on a given stream. + + This function can be used to send request or response headers: the kind + that are sent depends on whether this connection has been opened as a + client or server connection, and whether the stream was opened by the + remote peer or not. + + If this is a client connection, calling ``send_headers`` will send the + headers as a request. It will also implicitly open the stream being + used. If this is a client connection and ``send_headers`` has *already* + been called, this will send trailers instead. + + If this is a server connection, calling ``send_headers`` will send the + headers as a response. It is a protocol error for a server to open a + stream by sending headers. If this is a server connection and + ``send_headers`` has *already* been called, this will send trailers + instead. + + When acting as a server, you may call ``send_headers`` any number of + times allowed by the following rules, in this order: + + - zero or more times with ``(':status', '1XX')`` (where ``1XX`` is a + placeholder for any 100-level status code). + - once with any other status header. + - zero or one time for trailers. + + That is, you are allowed to send as many informational responses as you + like, followed by one complete response and zero or one HTTP trailer + blocks. + + Clients may send one or two header blocks: one request block, and + optionally one trailer block. + + If it is important to send HPACK "never indexed" header fields (as + defined in `RFC 7451 Section 7.1.3 + <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may + instead provide headers using the HPACK library's :class:`HeaderTuple + <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple + <hpack:hpack.NeverIndexedHeaderTuple>` objects. + + This method also allows users to prioritize the stream immediately, + by sending priority information on the HEADERS frame directly. To do + this, any one of ``priority_weight``, ``priority_depends_on``, or + ``priority_exclusive`` must be set to a value that is not ``None``. For + more information on the priority fields, see :meth:`prioritize + <h2.connection.H2Connection.prioritize>`. + + .. warning:: In HTTP/2, it is mandatory that all the HTTP/2 special + headers (that is, ones whose header keys begin with ``:``) appear + at the start of the header block, before any normal headers. + + .. versionchanged:: 2.3.0 + Added support for using :class:`HeaderTuple + <hpack:hpack.HeaderTuple>` objects to store headers. + + .. versionchanged:: 2.4.0 + Added the ability to provide priority keyword arguments: + ``priority_weight``, ``priority_depends_on``, and + ``priority_exclusive``. + + :param stream_id: The stream ID to send the headers on. If this stream + does not currently exist, it will be created. + :type stream_id: ``int`` + + :param headers: The request/response headers to send. + :type headers: An iterable of two tuples of bytestrings or + :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects. + + :param end_stream: Whether this headers frame should end the stream + immediately (that is, whether no more data will be sent after this + frame). Defaults to ``False``. + :type end_stream: ``bool`` + + :param priority_weight: Sets the priority weight of the stream. See + :meth:`prioritize <h2.connection.H2Connection.prioritize>` for more + about how this field works. Defaults to ``None``, which means that + no priority information will be sent. + :type priority_weight: ``int`` or ``None`` + + :param priority_depends_on: Sets which stream this one depends on for + priority purposes. See :meth:`prioritize + <h2.connection.H2Connection.prioritize>` for more about how this + field works. Defaults to ``None``, which means that no priority + information will be sent. + :type priority_depends_on: ``int`` or ``None`` + + :param priority_exclusive: Sets whether this stream exclusively depends + on the stream given in ``priority_depends_on`` for priority + purposes. See :meth:`prioritize + <h2.connection.H2Connection.prioritize>` for more about how this + field workds. Defaults to ``None``, which means that no priority + information will be sent. + :type priority_depends_on: ``bool`` or ``None`` + + :returns: Nothing + """ + self.config.logger.debug( + "Send headers on stream ID %d", stream_id + ) + + # Check we can open the stream. + if stream_id not in self.streams: + max_open_streams = self.remote_settings.max_concurrent_streams + if (self.open_outbound_streams + 1) > max_open_streams: + raise TooManyStreamsError( + "Max outbound streams is %d, %d open" % + (max_open_streams, self.open_outbound_streams) + ) + + self.state_machine.process_input(ConnectionInputs.SEND_HEADERS) + stream = self._get_or_create_stream( + stream_id, AllowedStreamIDs(self.config.client_side) + ) + frames = stream.send_headers( + headers, self.encoder, end_stream + ) + + # We may need to send priority information. + priority_present = ( + (priority_weight is not None) or + (priority_depends_on is not None) or + (priority_exclusive is not None) + ) + + if priority_present: + if not self.config.client_side: + raise RFC1122Error("Servers SHOULD NOT prioritize streams.") + + headers_frame = frames[0] + headers_frame.flags.add('PRIORITY') + frames[0] = _add_frame_priority( + headers_frame, + priority_weight, + priority_depends_on, + priority_exclusive + ) + + self._prepare_for_sending(frames) + + def send_data(self, stream_id, data, end_stream=False, pad_length=None): + """ + Send data on a given stream. + + This method does no breaking up of data: if the data is larger than the + value returned by :meth:`local_flow_control_window + <h2.connection.H2Connection.local_flow_control_window>` for this stream + then a :class:`FlowControlError <h2.exceptions.FlowControlError>` will + be raised. If the data is larger than :data:`max_outbound_frame_size + <h2.connection.H2Connection.max_outbound_frame_size>` then a + :class:`FrameTooLargeError <h2.exceptions.FrameTooLargeError>` will be + raised. + + Hyper-h2 does this to avoid buffering the data internally. If the user + has more data to send than hyper-h2 will allow, consider breaking it up + and buffering it externally. + + :param stream_id: The ID of the stream on which to send the data. + :type stream_id: ``int`` + :param data: The data to send on the stream. + :type data: ``bytes`` + :param end_stream: (optional) Whether this is the last data to be sent + on the stream. Defaults to ``False``. + :type end_stream: ``bool`` + :param pad_length: (optional) Length of the padding to apply to the + data frame. Defaults to ``None`` for no use of padding. Note that + a value of ``0`` results in padding of length ``0`` + (with the "padding" flag set on the frame). + + .. versionadded:: 2.6.0 + + :type pad_length: ``int`` + :returns: Nothing + """ + self.config.logger.debug( + "Send data on stream ID %d with len %d", stream_id, len(data) + ) + frame_size = len(data) + if pad_length is not None: + if not isinstance(pad_length, int): + raise TypeError("pad_length must be an int") + if pad_length < 0 or pad_length > 255: + raise ValueError("pad_length must be within range: [0, 255]") + # Account for padding bytes plus the 1-byte padding length field. + frame_size += pad_length + 1 + self.config.logger.debug( + "Frame size on stream ID %d is %d", stream_id, frame_size + ) + + if frame_size > self.local_flow_control_window(stream_id): + raise FlowControlError( + "Cannot send %d bytes, flow control window is %d." % + (frame_size, self.local_flow_control_window(stream_id)) + ) + elif frame_size > self.max_outbound_frame_size: + raise FrameTooLargeError( + "Cannot send frame size %d, max frame size is %d" % + (frame_size, self.max_outbound_frame_size) + ) + + self.state_machine.process_input(ConnectionInputs.SEND_DATA) + frames = self.streams[stream_id].send_data( + data, end_stream, pad_length=pad_length + ) + + self._prepare_for_sending(frames) + + self.outbound_flow_control_window -= frame_size + self.config.logger.debug( + "Outbound flow control window size is %d", + self.outbound_flow_control_window + ) + assert self.outbound_flow_control_window >= 0 + + def end_stream(self, stream_id): + """ + Cleanly end a given stream. + + This method ends a stream by sending an empty DATA frame on that stream + with the ``END_STREAM`` flag set. + + :param stream_id: The ID of the stream to end. + :type stream_id: ``int`` + :returns: Nothing + """ + self.config.logger.debug("End stream ID %d", stream_id) + self.state_machine.process_input(ConnectionInputs.SEND_DATA) + frames = self.streams[stream_id].end_stream() + self._prepare_for_sending(frames) + + def increment_flow_control_window(self, increment, stream_id=None): + """ + Increment a flow control window, optionally for a single stream. Allows + the remote peer to send more data. + + .. versionchanged:: 2.0.0 + Rejects attempts to increment the flow control window by out of + range values with a ``ValueError``. + + :param increment: The amount to increment the flow control window by. + :type increment: ``int`` + :param stream_id: (optional) The ID of the stream that should have its + flow control window opened. If not present or ``None``, the + connection flow control window will be opened instead. + :type stream_id: ``int`` or ``None`` + :returns: Nothing + :raises: ``ValueError`` + """ + if not (1 <= increment <= self.MAX_WINDOW_INCREMENT): + raise ValueError( + "Flow control increment must be between 1 and %d" % + self.MAX_WINDOW_INCREMENT + ) + + self.state_machine.process_input(ConnectionInputs.SEND_WINDOW_UPDATE) + + if stream_id is not None: + stream = self.streams[stream_id] + frames = stream.increase_flow_control_window( + increment + ) + + self.config.logger.debug( + "Increase stream ID %d flow control window by %d", + stream_id, increment + ) + else: + self._inbound_flow_control_window_manager.window_opened(increment) + f = WindowUpdateFrame(0) + f.window_increment = increment + frames = [f] + + self.config.logger.debug( + "Increase connection flow control window by %d", increment + ) + + self._prepare_for_sending(frames) + + def push_stream(self, stream_id, promised_stream_id, request_headers): + """ + Push a response to the client by sending a PUSH_PROMISE frame. + + If it is important to send HPACK "never indexed" header fields (as + defined in `RFC 7451 Section 7.1.3 + <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may + instead provide headers using the HPACK library's :class:`HeaderTuple + <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple + <hpack:hpack.NeverIndexedHeaderTuple>` objects. + + :param stream_id: The ID of the stream that this push is a response to. + :type stream_id: ``int`` + :param promised_stream_id: The ID of the stream that the pushed + response will be sent on. + :type promised_stream_id: ``int`` + :param request_headers: The headers of the request that the pushed + response will be responding to. + :type request_headers: An iterable of two tuples of bytestrings or + :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects. + :returns: Nothing + """ + self.config.logger.debug( + "Send Push Promise frame on stream ID %d", stream_id + ) + + if not self.remote_settings.enable_push: + raise ProtocolError("Remote peer has disabled stream push") + + self.state_machine.process_input(ConnectionInputs.SEND_PUSH_PROMISE) + stream = self._get_stream_by_id(stream_id) + + # We need to prevent users pushing streams in response to streams that + # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The + # easiest way to do that is to assert that the stream_id is not even: + # this shortcut works because only servers can push and the state + # machine will enforce this. + if (stream_id % 2) == 0: + raise ProtocolError("Cannot recursively push streams.") + + new_stream = self._begin_new_stream( + promised_stream_id, AllowedStreamIDs.EVEN + ) + self.streams[promised_stream_id] = new_stream + + frames = stream.push_stream_in_band( + promised_stream_id, request_headers, self.encoder + ) + new_frames = new_stream.locally_pushed() + self._prepare_for_sending(frames + new_frames) + + def ping(self, opaque_data): + """ + Send a PING frame. + + :param opaque_data: A bytestring of length 8 that will be sent in the + PING frame. + :returns: Nothing + """ + self.config.logger.debug("Send Ping frame") + + if not isinstance(opaque_data, bytes) or len(opaque_data) != 8: + raise ValueError("Invalid value for ping data: %r" % opaque_data) + + self.state_machine.process_input(ConnectionInputs.SEND_PING) + f = PingFrame(0) + f.opaque_data = opaque_data + self._prepare_for_sending([f]) + + def reset_stream(self, stream_id, error_code=0): + """ + Reset a stream. + + This method forcibly closes a stream by sending a RST_STREAM frame for + a given stream. This is not a graceful closure. To gracefully end a + stream, try the :meth:`end_stream + <h2.connection.H2Connection.end_stream>` method. + + :param stream_id: The ID of the stream to reset. + :type stream_id: ``int`` + :param error_code: (optional) The error code to use to reset the + stream. Defaults to :data:`ErrorCodes.NO_ERROR + <h2.errors.ErrorCodes.NO_ERROR>`. + :type error_code: ``int`` + :returns: Nothing + """ + self.config.logger.debug("Reset stream ID %d", stream_id) + self.state_machine.process_input(ConnectionInputs.SEND_RST_STREAM) + stream = self._get_stream_by_id(stream_id) + frames = stream.reset_stream(error_code) + self._prepare_for_sending(frames) + + def close_connection(self, error_code=0, additional_data=None, + last_stream_id=None): + + """ + Close a connection, emitting a GOAWAY frame. + + .. versionchanged:: 2.4.0 + Added ``additional_data`` and ``last_stream_id`` arguments. + + :param error_code: (optional) The error code to send in the GOAWAY + frame. + :param additional_data: (optional) Additional debug data indicating + a reason for closing the connection. Must be a bytestring. + :param last_stream_id: (optional) The last stream which was processed + by the sender. Defaults to ``highest_inbound_stream_id``. + :returns: Nothing + """ + self.config.logger.debug("Close connection") + self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY) + + # Additional_data must be bytes + if additional_data is not None: + assert isinstance(additional_data, bytes) + + if last_stream_id is None: + last_stream_id = self.highest_inbound_stream_id + + f = GoAwayFrame( + stream_id=0, + last_stream_id=last_stream_id, + error_code=error_code, + additional_data=(additional_data or b'') + ) + self._prepare_for_sending([f]) + + def update_settings(self, new_settings): + """ + Update the local settings. This will prepare and emit the appropriate + SETTINGS frame. + + :param new_settings: A dictionary of {setting: new value} + """ + self.config.logger.debug( + "Update connection settings to %s", new_settings + ) + self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) + self.local_settings.update(new_settings) + s = SettingsFrame(0) + s.settings = new_settings + self._prepare_for_sending([s]) + + def advertise_alternative_service(self, + field_value, + origin=None, + stream_id=None): + """ + Notify a client about an available Alternative Service. + + An Alternative Service is defined in `RFC 7838 + <https://tools.ietf.org/html/rfc7838>`_. An Alternative Service + notification informs a client that a given origin is also available + elsewhere. + + Alternative Services can be advertised in two ways. Firstly, they can + be advertised explicitly: that is, a server can say "origin X is also + available at Y". To advertise like this, set the ``origin`` argument + and not the ``stream_id`` argument. Alternatively, they can be + advertised implicitly: that is, a server can say "the origin you're + contacting on stream X is also available at Y". To advertise like this, + set the ``stream_id`` argument and not the ``origin`` argument. + + The explicit method of advertising can be done as long as the + connection is active. The implicit method can only be done after the + client has sent the request headers and before the server has sent the + response headers: outside of those points, Hyper-h2 will forbid sending + the Alternative Service advertisement by raising a ProtocolError. + + The ``field_value`` parameter is specified in RFC 7838. Hyper-h2 does + not validate or introspect this argument: the user is required to + ensure that it's well-formed. ``field_value`` corresponds to RFC 7838's + "Alternative Service Field Value". + + .. note:: It is strongly preferred to use the explicit method of + advertising Alternative Services. The implicit method of + advertising Alternative Services has a number of subtleties + and can lead to inconsistencies between the server and + client. Hyper-h2 allows both mechanisms, but caution is + strongly advised. + + .. versionadded:: 2.3.0 + + :param field_value: The RFC 7838 Alternative Service Field Value. This + argument is not introspected by Hyper-h2: the user is responsible + for ensuring that it is well-formed. + :type field_value: ``bytes`` + + :param origin: The origin/authority to which the Alternative Service + being advertised applies. Must not be provided at the same time as + ``stream_id``. + :type origin: ``bytes`` or ``None`` + + :param stream_id: The ID of the stream which was sent to the authority + for which this Alternative Service advertisement applies. Must not + be provided at the same time as ``origin``. + :type stream_id: ``int`` or ``None`` + + :returns: Nothing. + """ + if not isinstance(field_value, bytes): + raise ValueError("Field must be bytestring.") + + if origin is not None and stream_id is not None: + raise ValueError("Must not provide both origin and stream_id") + + self.state_machine.process_input( + ConnectionInputs.SEND_ALTERNATIVE_SERVICE + ) + + if origin is not None: + # This ALTSVC is sent on stream zero. + f = AltSvcFrame(stream_id=0) + f.origin = origin + f.field = field_value + frames = [f] + else: + stream = self._get_stream_by_id(stream_id) + frames = stream.advertise_alternative_service(field_value) + + self._prepare_for_sending(frames) + + def prioritize(self, stream_id, weight=None, depends_on=None, + exclusive=None): + """ + Notify a server about the priority of a stream. + + Stream priorities are a form of guidance to a remote server: they + inform the server about how important a given response is, so that the + server may allocate its resources (e.g. bandwidth, CPU time, etc.) + accordingly. This exists to allow clients to ensure that the most + important data arrives earlier, while less important data does not + starve out the more important data. + + Stream priorities are explained in depth in `RFC 7540 Section 5.3 + <https://tools.ietf.org/html/rfc7540#section-5.3>`_. + + This method updates the priority information of a single stream. It may + be called well before a stream is actively in use, or well after a + stream is closed. + + .. warning:: RFC 7540 allows for servers to change the priority of + streams. However, hyper-h2 **does not** allow server + stacks to do this. This is because most clients do not + adequately know how to respond when provided conflicting + priority information, and relatively little utility is + provided by making that functionality available. + + .. note:: hyper-h2 **does not** maintain any information about the + RFC 7540 priority tree. That means that hyper-h2 does not + prevent incautious users from creating invalid priority + trees, particularly by creating priority loops. While some + basic error checking is provided by hyper-h2, users are + strongly recommended to understand their prioritisation + strategies before using the priority tools here. + + .. note:: Priority information is strictly advisory. Servers are + allowed to disregard it entirely. Avoid relying on the idea + that your priority signaling will definitely be obeyed. + + .. versionadded:: 2.4.0 + + :param stream_id: The ID of the stream to prioritize. + :type stream_id: ``int`` + + :param weight: The weight to give the stream. Defaults to ``16``, the + default weight of any stream. May be any value between ``1`` and + ``256`` inclusive. The relative weight of a stream indicates what + proportion of available resources will be allocated to that + stream. + :type weight: ``int`` + + :param depends_on: The ID of the stream on which this stream depends. + This stream will only be progressed if it is impossible to + progress the parent stream (the one on which this one depends). + Passing the value ``0`` means that this stream does not depend on + any other. Defaults to ``0``. + :type depends_on: ``int`` + + :param exclusive: Whether this stream is an exclusive dependency of its + "parent" stream (i.e. the stream given by ``depends_on``). If a + stream is an exclusive dependency of another, that means that all + previously-set children of the parent are moved to become children + of the new exclusively-dependent stream. Defaults to ``False``. + :type exclusive: ``bool`` + """ + if not self.config.client_side: + raise RFC1122Error("Servers SHOULD NOT prioritize streams.") + + self.state_machine.process_input( + ConnectionInputs.SEND_PRIORITY + ) + + frame = PriorityFrame(stream_id) + frame = _add_frame_priority(frame, weight, depends_on, exclusive) + + self._prepare_for_sending([frame]) + + def local_flow_control_window(self, stream_id): + """ + Returns the maximum amount of data that can be sent on stream + ``stream_id``. + + This value will never be larger than the total data that can be sent on + the connection: even if the given stream allows more data, the + connection window provides a logical maximum to the amount of data that + can be sent. + + The maximum data that can be sent in a single data frame on a stream + is either this value, or the maximum frame size, whichever is + *smaller*. + + :param stream_id: The ID of the stream whose flow control window is + being queried. + :type stream_id: ``int`` + :returns: The amount of data in bytes that can be sent on the stream + before the flow control window is exhausted. + :rtype: ``int`` + """ + stream = self._get_stream_by_id(stream_id) + return min( + self.outbound_flow_control_window, + stream.outbound_flow_control_window + ) + + def remote_flow_control_window(self, stream_id): + """ + Returns the maximum amount of data the remote peer can send on stream + ``stream_id``. + + This value will never be larger than the total data that can be sent on + the connection: even if the given stream allows more data, the + connection window provides a logical maximum to the amount of data that + can be sent. + + The maximum data that can be sent in a single data frame on a stream + is either this value, or the maximum frame size, whichever is + *smaller*. + + :param stream_id: The ID of the stream whose flow control window is + being queried. + :type stream_id: ``int`` + :returns: The amount of data in bytes that can be received on the + stream before the flow control window is exhausted. + :rtype: ``int`` + """ + stream = self._get_stream_by_id(stream_id) + return min( + self.inbound_flow_control_window, + stream.inbound_flow_control_window + ) + + def acknowledge_received_data(self, acknowledged_size, stream_id): + """ + Inform the :class:`H2Connection <h2.connection.H2Connection>` that a + certain number of flow-controlled bytes have been processed, and that + the space should be handed back to the remote peer at an opportune + time. + + .. versionadded:: 2.5.0 + + :param acknowledged_size: The total *flow-controlled size* of the data + that has been processed. Note that this must include the amount of + padding that was sent with that data. + :type acknowledged_size: ``int`` + :param stream_id: The ID of the stream on which this data was received. + :type stream_id: ``int`` + :returns: Nothing + :rtype: ``None`` + """ + self.config.logger.debug( + "Ack received data on stream ID %d with size %d", + stream_id, acknowledged_size + ) + if stream_id <= 0: + raise ValueError( + "Stream ID %d is not valid for acknowledge_received_data" % + stream_id + ) + if acknowledged_size < 0: + raise ValueError("Cannot acknowledge negative data") + + frames = [] + + conn_manager = self._inbound_flow_control_window_manager + conn_increment = conn_manager.process_bytes(acknowledged_size) + if conn_increment: + f = WindowUpdateFrame(0) + f.window_increment = conn_increment + frames.append(f) + + try: + stream = self._get_stream_by_id(stream_id) + except StreamClosedError: + # The stream is already gone. We're not worried about incrementing + # the window in this case. + pass + else: + # No point incrementing the windows of closed streams. + if stream.open: + frames.extend( + stream.acknowledge_received_data(acknowledged_size) + ) + + self._prepare_for_sending(frames) + + def data_to_send(self, amount=None): + """ + Returns some data for sending out of the internal data buffer. + + This method is analogous to ``read`` on a file-like object, but it + doesn't block. Instead, it returns as much data as the user asks for, + or less if that much data is not available. It does not perform any + I/O, and so uses a different name. + + :param amount: (optional) The maximum amount of data to return. If not + set, or set to ``None``, will return as much data as possible. + :type amount: ``int`` + :returns: A bytestring containing the data to send on the wire. + :rtype: ``bytes`` + """ + if amount is None: + data = bytes(self._data_to_send) + self._data_to_send = bytearray() + return data + else: + data = bytes(self._data_to_send[:amount]) + self._data_to_send = self._data_to_send[amount:] + return data + + def clear_outbound_data_buffer(self): + """ + Clears the outbound data buffer, such that if this call was immediately + followed by a call to + :meth:`data_to_send <h2.connection.H2Connection.data_to_send>`, that + call would return no data. + + This method should not normally be used, but is made available to avoid + exposing implementation details. + """ + self._data_to_send = bytearray() + + def _acknowledge_settings(self): + """ + Acknowledge settings that have been received. + + .. versionchanged:: 2.0.0 + Removed from public API, removed useless ``event`` parameter, made + automatic. + + :returns: Nothing + """ + self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) + + changes = self.remote_settings.acknowledge() + + if SettingCodes.INITIAL_WINDOW_SIZE in changes: + setting = changes[SettingCodes.INITIAL_WINDOW_SIZE] + self._flow_control_change_from_settings( + setting.original_value, + setting.new_value, + ) + + # HEADER_TABLE_SIZE changes by the remote part affect our encoder: cf. + # RFC 7540 Section 6.5.2. + if SettingCodes.HEADER_TABLE_SIZE in changes: + setting = changes[SettingCodes.HEADER_TABLE_SIZE] + self.encoder.header_table_size = setting.new_value + + if SettingCodes.MAX_FRAME_SIZE in changes: + setting = changes[SettingCodes.MAX_FRAME_SIZE] + self.max_outbound_frame_size = setting.new_value + for stream in self.streams.values(): + stream.max_outbound_frame_size = setting.new_value + + f = SettingsFrame(0) + f.flags.add('ACK') + return [f] + + def _flow_control_change_from_settings(self, old_value, new_value): + """ + Update flow control windows in response to a change in the value of + SETTINGS_INITIAL_WINDOW_SIZE. + + When this setting is changed, it automatically updates all flow control + windows by the delta in the settings values. Note that it does not + increment the *connection* flow control window, per section 6.9.2 of + RFC 7540. + """ + delta = new_value - old_value + + for stream in self.streams.values(): + stream.outbound_flow_control_window = guard_increment_window( + stream.outbound_flow_control_window, + delta + ) + + def _inbound_flow_control_change_from_settings(self, old_value, new_value): + """ + Update remote flow control windows in response to a change in the value + of SETTINGS_INITIAL_WINDOW_SIZE. + + When this setting is changed, it automatically updates all remote flow + control windows by the delta in the settings values. + """ + delta = new_value - old_value + + for stream in self.streams.values(): + stream._inbound_flow_control_change_from_settings(delta) + + def receive_data(self, data): + """ + Pass some received HTTP/2 data to the connection for handling. + + :param data: The data received from the remote peer on the network. + :type data: ``bytes`` + :returns: A list of events that the remote peer triggered by sending + this data. + """ + self.config.logger.trace( + "Process received data on connection. Received data: %r", data + ) + + events = [] + self.incoming_buffer.add_data(data) + self.incoming_buffer.max_frame_size = self.max_inbound_frame_size + + try: + for frame in self.incoming_buffer: + events.extend(self._receive_frame(frame)) + except InvalidPaddingError: + self._terminate_connection(ErrorCodes.PROTOCOL_ERROR) + raise ProtocolError("Received frame with invalid padding.") + except ProtocolError as e: + # For whatever reason, receiving the frame caused a protocol error. + # We should prepare to emit a GoAway frame before throwing the + # exception up further. No need for an event: the exception will + # do fine. + self._terminate_connection(e.error_code) + raise + + return events + + def _receive_frame(self, frame): + """ + Handle a frame received on the connection. + + .. versionchanged:: 2.0.0 + Removed from the public API. + """ + try: + # I don't love using __class__ here, maybe reconsider it. + frames, events = self._frame_dispatch_table[frame.__class__](frame) + except StreamClosedError as e: + # If the stream was closed by RST_STREAM, we just send a RST_STREAM + # to the remote peer. Otherwise, this is a connection error, and so + # we will re-raise to trigger one. + if self._stream_is_closed_by_reset(e.stream_id): + f = RstStreamFrame(e.stream_id) + f.error_code = e.error_code + self._prepare_for_sending([f]) + events = e._events + else: + raise + except StreamIDTooLowError as e: + # The stream ID seems invalid. This may happen when the closed + # stream has been cleaned up, or when the remote peer has opened a + # new stream with a higher stream ID than this one, forcing it + # closed implicitly. + # + # Check how the stream was closed: depending on the mechanism, it + # is either a stream error or a connection error. + if self._stream_is_closed_by_reset(e.stream_id): + # Closed by RST_STREAM is a stream error. + f = RstStreamFrame(e.stream_id) + f.error_code = ErrorCodes.STREAM_CLOSED + self._prepare_for_sending([f]) + events = [] + elif self._stream_is_closed_by_end(e.stream_id): + # Closed by END_STREAM is a connection error. + raise StreamClosedError(e.stream_id) + else: + # Closed implicitly, also a connection error, but of type + # PROTOCOL_ERROR. + raise + else: + self._prepare_for_sending(frames) + + return events + + def _terminate_connection(self, error_code): + """ + Terminate the connection early. Used in error handling blocks to send + GOAWAY frames. + """ + f = GoAwayFrame(0) + f.last_stream_id = self.highest_inbound_stream_id + f.error_code = error_code + self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY) + self._prepare_for_sending([f]) + + def _receive_headers_frame(self, frame): + """ + Receive a headers frame on the connection. + """ + # If necessary, check we can open the stream. Also validate that the + # stream ID is valid. + if frame.stream_id not in self.streams: + max_open_streams = self.local_settings.max_concurrent_streams + if (self.open_inbound_streams + 1) > max_open_streams: + raise TooManyStreamsError( + "Max outbound streams is %d, %d open" % + (max_open_streams, self.open_outbound_streams) + ) + + # Let's decode the headers. We handle headers as bytes internally up + # until we hang them off the event, at which point we may optionally + # convert them to unicode. + headers = _decode_headers(self.decoder, frame.data) + + events = self.state_machine.process_input( + ConnectionInputs.RECV_HEADERS + ) + stream = self._get_or_create_stream( + frame.stream_id, AllowedStreamIDs(not self.config.client_side) + ) + frames, stream_events = stream.receive_headers( + headers, + 'END_STREAM' in frame.flags, + self.config.header_encoding + ) + + if 'PRIORITY' in frame.flags: + p_frames, p_events = self._receive_priority_frame(frame) + stream_events[0].priority_updated = p_events[0] + stream_events.extend(p_events) + assert not p_frames + + return frames, events + stream_events + + def _receive_push_promise_frame(self, frame): + """ + Receive a push-promise frame on the connection. + """ + if not self.local_settings.enable_push: + raise ProtocolError("Received pushed stream") + + pushed_headers = _decode_headers(self.decoder, frame.data) + + events = self.state_machine.process_input( + ConnectionInputs.RECV_PUSH_PROMISE + ) + + try: + stream = self._get_stream_by_id(frame.stream_id) + except NoSuchStreamError: + # We need to check if the parent stream was reset by us. If it was + # then we presume that the PUSH_PROMISE was in flight when we reset + # the parent stream. Rather than accept the new stream, just reset + # it. + # + # If this was closed naturally, however, we should call this a + # PROTOCOL_ERROR: pushing a stream on a naturally closed stream is + # a real problem because it creates a brand new stream that the + # remote peer now believes exists. + if (self._stream_closed_by(frame.stream_id) == + StreamClosedBy.SEND_RST_STREAM): + f = RstStreamFrame(frame.promised_stream_id) + f.error_code = ErrorCodes.REFUSED_STREAM + return [f], events + + raise ProtocolError("Attempted to push on closed stream.") + + # We need to prevent peers pushing streams in response to streams that + # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The + # easiest way to do that is to assert that the stream_id is not even: + # this shortcut works because only servers can push and the state + # machine will enforce this. + if (frame.stream_id % 2) == 0: + raise ProtocolError("Cannot recursively push streams.") + + try: + frames, stream_events = stream.receive_push_promise_in_band( + frame.promised_stream_id, + pushed_headers, + self.config.header_encoding, + ) + except StreamClosedError: + # The parent stream was reset by us, so we presume that + # PUSH_PROMISE was in flight when we reset the parent stream. + # So we just reset the new stream. + f = RstStreamFrame(frame.promised_stream_id) + f.error_code = ErrorCodes.REFUSED_STREAM + return [f], events + + new_stream = self._begin_new_stream( + frame.promised_stream_id, AllowedStreamIDs.EVEN + ) + self.streams[frame.promised_stream_id] = new_stream + new_stream.remotely_pushed(pushed_headers) + + return frames, events + stream_events + + def _handle_data_on_closed_stream(self, events, exc, frame): + # This stream is already closed - and yet we received a DATA frame. + # The received DATA frame counts towards the connection flow window. + # We need to manually to acknowledge the DATA frame to update the flow + # window of the connection. Otherwise the whole connection stalls due + # the inbound flow window being 0. + frames = [] + conn_manager = self._inbound_flow_control_window_manager + conn_increment = conn_manager.process_bytes( + frame.flow_controlled_length + ) + if conn_increment: + f = WindowUpdateFrame(0) + f.window_increment = conn_increment + frames.append(f) + self.config.logger.debug( + "Received DATA frame on closed stream %d - " + "auto-emitted a WINDOW_UPDATE by %d", + frame.stream_id, conn_increment + ) + f = RstStreamFrame(exc.stream_id) + f.error_code = exc.error_code + frames.append(f) + self.config.logger.debug( + "Stream %d already CLOSED or cleaned up - " + "auto-emitted a RST_FRAME" % frame.stream_id + ) + return frames, events + exc._events + + def _receive_data_frame(self, frame): + """ + Receive a data frame on the connection. + """ + flow_controlled_length = frame.flow_controlled_length + + events = self.state_machine.process_input( + ConnectionInputs.RECV_DATA + ) + self._inbound_flow_control_window_manager.window_consumed( + flow_controlled_length + ) + + try: + stream = self._get_stream_by_id(frame.stream_id) + frames, stream_events = stream.receive_data( + frame.data, + 'END_STREAM' in frame.flags, + flow_controlled_length + ) + except StreamClosedError as e: + # This stream is either marked as CLOSED or already gone from our + # internal state. + return self._handle_data_on_closed_stream(events, e, frame) + + return frames, events + stream_events + + def _receive_settings_frame(self, frame): + """ + Receive a SETTINGS frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_SETTINGS + ) + + # This is an ack of the local settings. + if 'ACK' in frame.flags: + changed_settings = self._local_settings_acked() + ack_event = SettingsAcknowledged() + ack_event.changed_settings = changed_settings + events.append(ack_event) + return [], events + + # Add the new settings. + self.remote_settings.update(frame.settings) + events.append( + RemoteSettingsChanged.from_settings( + self.remote_settings, frame.settings + ) + ) + frames = self._acknowledge_settings() + + return frames, events + + def _receive_window_update_frame(self, frame): + """ + Receive a WINDOW_UPDATE frame on the connection. + """ + # Validate the frame. + if not (1 <= frame.window_increment <= self.MAX_WINDOW_INCREMENT): + raise ProtocolError( + "Flow control increment must be between 1 and %d, received %d" + % (self.MAX_WINDOW_INCREMENT, frame.window_increment) + ) + + events = self.state_machine.process_input( + ConnectionInputs.RECV_WINDOW_UPDATE + ) + + if frame.stream_id: + try: + stream = self._get_stream_by_id(frame.stream_id) + frames, stream_events = stream.receive_window_update( + frame.window_increment + ) + except StreamClosedError: + return [], events + else: + # Increment our local flow control window. + self.outbound_flow_control_window = guard_increment_window( + self.outbound_flow_control_window, + frame.window_increment + ) + + # FIXME: Should we split this into one event per active stream? + window_updated_event = WindowUpdated() + window_updated_event.stream_id = 0 + window_updated_event.delta = frame.window_increment + stream_events = [window_updated_event] + frames = [] + + return frames, events + stream_events + + def _receive_ping_frame(self, frame): + """ + Receive a PING frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_PING + ) + flags = [] + + if 'ACK' in frame.flags: + evt = PingAckReceived() + else: + evt = PingReceived() + + # automatically ACK the PING with the same 'opaque data' + f = PingFrame(0) + f.flags = {'ACK'} + f.opaque_data = frame.opaque_data + flags.append(f) + + evt.ping_data = frame.opaque_data + events.append(evt) + + return flags, events + + def _receive_rst_stream_frame(self, frame): + """ + Receive a RST_STREAM frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_RST_STREAM + ) + try: + stream = self._get_stream_by_id(frame.stream_id) + except NoSuchStreamError: + # The stream is missing. That's ok, we just do nothing here. + stream_frames = [] + stream_events = [] + else: + stream_frames, stream_events = stream.stream_reset(frame) + + return stream_frames, events + stream_events + + def _receive_priority_frame(self, frame): + """ + Receive a PRIORITY frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_PRIORITY + ) + + event = PriorityUpdated() + event.stream_id = frame.stream_id + event.depends_on = frame.depends_on + event.exclusive = frame.exclusive + + # Weight is an integer between 1 and 256, but the byte only allows + # 0 to 255: add one. + event.weight = frame.stream_weight + 1 + + # A stream may not depend on itself. + if event.depends_on == frame.stream_id: + raise ProtocolError( + "Stream %d may not depend on itself" % frame.stream_id + ) + events.append(event) + + return [], events + + def _receive_goaway_frame(self, frame): + """ + Receive a GOAWAY frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_GOAWAY + ) + + # Clear the outbound data buffer: we cannot send further data now. + self.clear_outbound_data_buffer() + + # Fire an appropriate ConnectionTerminated event. + new_event = ConnectionTerminated() + new_event.error_code = _error_code_from_int(frame.error_code) + new_event.last_stream_id = frame.last_stream_id + new_event.additional_data = (frame.additional_data + if frame.additional_data else None) + events.append(new_event) + + return [], events + + def _receive_naked_continuation(self, frame): + """ + A naked CONTINUATION frame has been received. This is always an error, + but the type of error it is depends on the state of the stream and must + transition the state of the stream, so we need to pass it to the + appropriate stream. + """ + stream = self._get_stream_by_id(frame.stream_id) + stream.receive_continuation() + assert False, "Should not be reachable" + + def _receive_alt_svc_frame(self, frame): + """ + An ALTSVC frame has been received. This frame, specified in RFC 7838, + is used to advertise alternative places where the same service can be + reached. + + This frame can optionally be received either on a stream or on stream + 0, and its semantics are different in each case. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_ALTERNATIVE_SERVICE + ) + frames = [] + + if frame.stream_id: + # Given that it makes no sense to receive ALTSVC on a stream + # before that stream has been opened with a HEADERS frame, the + # ALTSVC frame cannot create a stream. If the stream is not + # present, we simply ignore the frame. + try: + stream = self._get_stream_by_id(frame.stream_id) + except (NoSuchStreamError, StreamClosedError): + pass + else: + stream_frames, stream_events = stream.receive_alt_svc(frame) + frames.extend(stream_frames) + events.extend(stream_events) + else: + # This frame is sent on stream 0. The origin field on the frame + # must be present, though if it isn't it's not a ProtocolError + # (annoyingly), we just need to ignore it. + if not frame.origin: + return frames, events + + # If we're a server, we want to ignore this (RFC 7838 says so). + if not self.config.client_side: + return frames, events + + event = AlternativeServiceAvailable() + event.origin = frame.origin + event.field_value = frame.field + events.append(event) + + return frames, events + + def _receive_unknown_frame(self, frame): + """ + We have received a frame that we do not understand. This is almost + certainly an extension frame, though it's impossible to be entirely + sure. + + RFC 7540 § 5.5 says that we MUST ignore unknown frame types: so we + do. We do notify the user that we received one, however. + """ + # All we do here is log. + self.config.logger.debug( + "Received unknown extension frame (ID %d)", frame.stream_id + ) + event = UnknownFrameReceived() + event.frame = frame + return [], [event] + + def _local_settings_acked(self): + """ + Handle the local settings being ACKed, update internal state. + """ + changes = self.local_settings.acknowledge() + + if SettingCodes.INITIAL_WINDOW_SIZE in changes: + setting = changes[SettingCodes.INITIAL_WINDOW_SIZE] + self._inbound_flow_control_change_from_settings( + setting.original_value, + setting.new_value, + ) + + if SettingCodes.MAX_HEADER_LIST_SIZE in changes: + setting = changes[SettingCodes.MAX_HEADER_LIST_SIZE] + self.decoder.max_header_list_size = setting.new_value + + if SettingCodes.MAX_FRAME_SIZE in changes: + setting = changes[SettingCodes.MAX_FRAME_SIZE] + self.max_inbound_frame_size = setting.new_value + + if SettingCodes.HEADER_TABLE_SIZE in changes: + setting = changes[SettingCodes.HEADER_TABLE_SIZE] + # This is safe across all hpack versions: some versions just won't + # respect it. + self.decoder.max_allowed_table_size = setting.new_value + + return changes + + def _stream_id_is_outbound(self, stream_id): + """ + Returns ``True`` if the stream ID corresponds to an outbound stream + (one initiated by this peer), returns ``False`` otherwise. + """ + return (stream_id % 2 == int(self.config.client_side)) + + def _stream_closed_by(self, stream_id): + """ + Returns how the stream was closed. + + The return value will be either a member of + ``h2.stream.StreamClosedBy`` or ``None``. If ``None``, the stream was + closed implicitly by the peer opening a stream with a higher stream ID + before opening this one. + """ + if stream_id in self.streams: + return self.streams[stream_id].closed_by + if stream_id in self._closed_streams: + return self._closed_streams[stream_id] + return None + + def _stream_is_closed_by_reset(self, stream_id): + """ + Returns ``True`` if the stream was closed by sending or receiving a + RST_STREAM frame. Returns ``False`` otherwise. + """ + return self._stream_closed_by(stream_id) in ( + StreamClosedBy.RECV_RST_STREAM, StreamClosedBy.SEND_RST_STREAM + ) + + def _stream_is_closed_by_end(self, stream_id): + """ + Returns ``True`` if the stream was closed by sending or receiving an + END_STREAM flag in a HEADERS or DATA frame. Returns ``False`` + otherwise. + """ + return self._stream_closed_by(stream_id) in ( + StreamClosedBy.RECV_END_STREAM, StreamClosedBy.SEND_END_STREAM + ) + + +def _add_frame_priority(frame, weight=None, depends_on=None, exclusive=None): + """ + Adds priority data to a given frame. Does not change any flags set on that + frame: if the caller is adding priority information to a HEADERS frame they + must set that themselves. + + This method also deliberately sets defaults for anything missing. + + This method validates the input values. + """ + # A stream may not depend on itself. + if depends_on == frame.stream_id: + raise ProtocolError( + "Stream %d may not depend on itself" % frame.stream_id + ) + + # Weight must be between 1 and 256. + if weight is not None: + if weight > 256 or weight < 1: + raise ProtocolError( + "Weight must be between 1 and 256, not %d" % weight + ) + else: + # Weight is an integer between 1 and 256, but the byte only allows + # 0 to 255: subtract one. + weight -= 1 + + # Set defaults for anything not provided. + weight = weight if weight is not None else 15 + depends_on = depends_on if depends_on is not None else 0 + exclusive = exclusive if exclusive is not None else False + + frame.stream_weight = weight + frame.depends_on = depends_on + frame.exclusive = exclusive + + return frame + + +def _decode_headers(decoder, encoded_header_block): + """ + Decode a HPACK-encoded header block, translating HPACK exceptions into + sensible hyper-h2 errors. + + This only ever returns bytestring headers: hyper-h2 may emit them as + unicode later, but internally it processes them as bytestrings only. + """ + try: + return decoder.decode(encoded_header_block, raw=True) + except OversizedHeaderListError as e: + # This is a symptom of a HPACK bomb attack: the user has + # disregarded our requirements on how large a header block we'll + # accept. + raise DenialOfServiceError("Oversized header block: %s" % e) + except (HPACKError, IndexError, TypeError, UnicodeDecodeError) as e: + # We should only need HPACKError here, but versions of HPACK older + # than 2.1.0 throw all three others as well. For maximum + # compatibility, catch all of them. + raise ProtocolError("Error decoding header block: %s" % e) diff --git a/src/h2/errors.py b/src/h2/errors.py new file mode 100644 index 0000000..303df59 --- /dev/null +++ b/src/h2/errors.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +""" +h2/errors +~~~~~~~~~ + +Global error code registry containing the established HTTP/2 error codes. + +The current registry is available at: +https://tools.ietf.org/html/rfc7540#section-11.4 +""" +import enum + + +class ErrorCodes(enum.IntEnum): + """ + All known HTTP/2 error codes. + + .. versionadded:: 2.5.0 + """ + #: Graceful shutdown. + NO_ERROR = 0x0 + + #: Protocol error detected. + PROTOCOL_ERROR = 0x1 + + #: Implementation fault. + INTERNAL_ERROR = 0x2 + + #: Flow-control limits exceeded. + FLOW_CONTROL_ERROR = 0x3 + + #: Settings not acknowledged. + SETTINGS_TIMEOUT = 0x4 + + #: Frame received for closed stream. + STREAM_CLOSED = 0x5 + + #: Frame size incorrect. + FRAME_SIZE_ERROR = 0x6 + + #: Stream not processed. + REFUSED_STREAM = 0x7 + + #: Stream cancelled. + CANCEL = 0x8 + + #: Compression state not updated. + COMPRESSION_ERROR = 0x9 + + #: TCP connection error for CONNECT method. + CONNECT_ERROR = 0xa + + #: Processing capacity exceeded. + ENHANCE_YOUR_CALM = 0xb + + #: Negotiated TLS parameters not acceptable. + INADEQUATE_SECURITY = 0xc + + #: Use HTTP/1.1 for the request. + HTTP_1_1_REQUIRED = 0xd + + +def _error_code_from_int(code): + """ + Given an integer error code, returns either one of :class:`ErrorCodes + <h2.errors.ErrorCodes>` or, if not present in the known set of codes, + returns the integer directly. + """ + try: + return ErrorCodes(code) + except ValueError: + return code + + +__all__ = ['ErrorCodes'] diff --git a/src/h2/events.py b/src/h2/events.py new file mode 100644 index 0000000..a06c990 --- /dev/null +++ b/src/h2/events.py @@ -0,0 +1,648 @@ +# -*- coding: utf-8 -*- +""" +h2/events +~~~~~~~~~ + +Defines Event types for HTTP/2. + +Events are returned by the H2 state machine to allow implementations to keep +track of events triggered by receiving data. Each time data is provided to the +H2 state machine it processes the data and returns a list of Event objects. +""" +import binascii + +from .settings import ChangedSetting, _setting_code_from_int + + +class Event(object): + """ + Base class for h2 events. + """ + pass + + +class RequestReceived(Event): + """ + The RequestReceived event is fired whenever request headers are received. + This event carries the HTTP headers for the given request and the stream ID + of the new stream. + + .. versionchanged:: 2.3.0 + Changed the type of ``headers`` to :class:`HeaderTuple + <hpack:hpack.HeaderTuple>`. This has no effect on current users. + + .. versionchanged:: 2.4.0 + Added ``stream_ended`` and ``priority_updated`` properties. + """ + def __init__(self): + #: The Stream ID for the stream this request was made on. + self.stream_id = None + + #: The request headers. + self.headers = None + + #: If this request also ended the stream, the associated + #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available + #: here. + #: + #: .. versionadded:: 2.4.0 + self.stream_ended = None + + #: If this request also had associated priority information, the + #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>` + #: event will be available here. + #: + #: .. versionadded:: 2.4.0 + self.priority_updated = None + + def __repr__(self): + return "<RequestReceived stream_id:%s, headers:%s>" % ( + self.stream_id, self.headers + ) + + +class ResponseReceived(Event): + """ + The ResponseReceived event is fired whenever response headers are received. + This event carries the HTTP headers for the given response and the stream + ID of the new stream. + + .. versionchanged:: 2.3.0 + Changed the type of ``headers`` to :class:`HeaderTuple + <hpack:hpack.HeaderTuple>`. This has no effect on current users. + + .. versionchanged:: 2.4.0 + Added ``stream_ended`` and ``priority_updated`` properties. + """ + def __init__(self): + #: The Stream ID for the stream this response was made on. + self.stream_id = None + + #: The response headers. + self.headers = None + + #: If this response also ended the stream, the associated + #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available + #: here. + #: + #: .. versionadded:: 2.4.0 + self.stream_ended = None + + #: If this response also had associated priority information, the + #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>` + #: event will be available here. + #: + #: .. versionadded:: 2.4.0 + self.priority_updated = None + + def __repr__(self): + return "<ResponseReceived stream_id:%s, headers:%s>" % ( + self.stream_id, self.headers + ) + + +class TrailersReceived(Event): + """ + The TrailersReceived event is fired whenever trailers are received on a + stream. Trailers are a set of headers sent after the body of the + request/response, and are used to provide information that wasn't known + ahead of time (e.g. content-length). This event carries the HTTP header + fields that form the trailers and the stream ID of the stream on which they + were received. + + .. versionchanged:: 2.3.0 + Changed the type of ``headers`` to :class:`HeaderTuple + <hpack:hpack.HeaderTuple>`. This has no effect on current users. + + .. versionchanged:: 2.4.0 + Added ``stream_ended`` and ``priority_updated`` properties. + """ + def __init__(self): + #: The Stream ID for the stream on which these trailers were received. + self.stream_id = None + + #: The trailers themselves. + self.headers = None + + #: Trailers always end streams. This property has the associated + #: :class:`StreamEnded <h2.events.StreamEnded>` in it. + #: + #: .. versionadded:: 2.4.0 + self.stream_ended = None + + #: If the trailers also set associated priority information, the + #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>` + #: event will be available here. + #: + #: .. versionadded:: 2.4.0 + self.priority_updated = None + + def __repr__(self): + return "<TrailersReceived stream_id:%s, headers:%s>" % ( + self.stream_id, self.headers + ) + + +class _HeadersSent(Event): + """ + The _HeadersSent event is fired whenever headers are sent. + + This is an internal event, used to determine validation steps on + outgoing header blocks. + """ + pass + + +class _ResponseSent(_HeadersSent): + """ + The _ResponseSent event is fired whenever response headers are sent + on a stream. + + This is an internal event, used to determine validation steps on + outgoing header blocks. + """ + pass + + +class _RequestSent(_HeadersSent): + """ + The _RequestSent event is fired whenever request headers are sent + on a stream. + + This is an internal event, used to determine validation steps on + outgoing header blocks. + """ + pass + + +class _TrailersSent(_HeadersSent): + """ + The _TrailersSent event is fired whenever trailers are sent on a + stream. Trailers are a set of headers sent after the body of the + request/response, and are used to provide information that wasn't known + ahead of time (e.g. content-length). + + This is an internal event, used to determine validation steps on + outgoing header blocks. + """ + pass + + +class _PushedRequestSent(_HeadersSent): + """ + The _PushedRequestSent event is fired whenever pushed request headers are + sent. + + This is an internal event, used to determine validation steps on outgoing + header blocks. + """ + pass + + +class InformationalResponseReceived(Event): + """ + The InformationalResponseReceived event is fired when an informational + response (that is, one whose status code is a 1XX code) is received from + the remote peer. + + The remote peer may send any number of these, from zero upwards. These + responses are most commonly sent in response to requests that have the + ``expect: 100-continue`` header field present. Most users can safely + ignore this event unless you are intending to use the + ``expect: 100-continue`` flow, or are for any reason expecting a different + 1XX status code. + + .. versionadded:: 2.2.0 + + .. versionchanged:: 2.3.0 + Changed the type of ``headers`` to :class:`HeaderTuple + <hpack:hpack.HeaderTuple>`. This has no effect on current users. + + .. versionchanged:: 2.4.0 + Added ``priority_updated`` property. + """ + def __init__(self): + #: The Stream ID for the stream this informational response was made + #: on. + self.stream_id = None + + #: The headers for this informational response. + self.headers = None + + #: If this response also had associated priority information, the + #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>` + #: event will be available here. + #: + #: .. versionadded:: 2.4.0 + self.priority_updated = None + + def __repr__(self): + return "<InformationalResponseReceived stream_id:%s, headers:%s>" % ( + self.stream_id, self.headers + ) + + +class DataReceived(Event): + """ + The DataReceived event is fired whenever data is received on a stream from + the remote peer. The event carries the data itself, and the stream ID on + which the data was received. + + .. versionchanged:: 2.4.0 + Added ``stream_ended`` property. + """ + def __init__(self): + #: The Stream ID for the stream this data was received on. + self.stream_id = None + + #: The data itself. + self.data = None + + #: The amount of data received that counts against the flow control + #: window. Note that padding counts against the flow control window, so + #: when adjusting flow control you should always use this field rather + #: than ``len(data)``. + self.flow_controlled_length = None + + #: If this data chunk also completed the stream, the associated + #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available + #: here. + #: + #: .. versionadded:: 2.4.0 + self.stream_ended = None + + def __repr__(self): + return ( + "<DataReceived stream_id:%s, " + "flow_controlled_length:%s, " + "data:%s>" % ( + self.stream_id, + self.flow_controlled_length, + _bytes_representation(self.data[:20]), + ) + ) + + +class WindowUpdated(Event): + """ + The WindowUpdated event is fired whenever a flow control window changes + size. HTTP/2 defines flow control windows for connections and streams: this + event fires for both connections and streams. The event carries the ID of + the stream to which it applies (set to zero if the window update applies to + the connection), and the delta in the window size. + """ + def __init__(self): + #: The Stream ID of the stream whose flow control window was changed. + #: May be ``0`` if the connection window was changed. + self.stream_id = None + + #: The window delta. + self.delta = None + + def __repr__(self): + return "<WindowUpdated stream_id:%s, delta:%s>" % ( + self.stream_id, self.delta + ) + + +class RemoteSettingsChanged(Event): + """ + The RemoteSettingsChanged event is fired whenever the remote peer changes + its settings. It contains a complete inventory of changed settings, + including their previous values. + + In HTTP/2, settings changes need to be acknowledged. hyper-h2 automatically + acknowledges settings changes for efficiency. However, it is possible that + the caller may not be happy with the changed setting. + + When this event is received, the caller should confirm that the new + settings are acceptable. If they are not acceptable, the user should close + the connection with the error code :data:`PROTOCOL_ERROR + <h2.errors.ErrorCodes.PROTOCOL_ERROR>`. + + .. versionchanged:: 2.0.0 + Prior to this version the user needed to acknowledge settings changes. + This is no longer the case: hyper-h2 now automatically acknowledges + them. + """ + def __init__(self): + #: A dictionary of setting byte to + #: :class:`ChangedSetting <h2.settings.ChangedSetting>`, representing + #: the changed settings. + self.changed_settings = {} + + @classmethod + def from_settings(cls, old_settings, new_settings): + """ + Build a RemoteSettingsChanged event from a set of changed settings. + + :param old_settings: A complete collection of old settings, in the form + of a dictionary of ``{setting: value}``. + :param new_settings: All the changed settings and their new values, in + the form of a dictionary of ``{setting: value}``. + """ + e = cls() + for setting, new_value in new_settings.items(): + setting = _setting_code_from_int(setting) + original_value = old_settings.get(setting) + change = ChangedSetting(setting, original_value, new_value) + e.changed_settings[setting] = change + + return e + + def __repr__(self): + return "<RemoteSettingsChanged changed_settings:{%s}>" % ( + ", ".join(repr(cs) for cs in self.changed_settings.values()), + ) + + +class PingReceived(Event): + """ + The PingReceived event is fired whenever a PING is received. It contains + the 'opaque data' of the PING frame. A ping acknowledgment with the same + 'opaque data' is automatically emitted after receiving a ping. + + .. versionadded:: 3.1.0 + """ + def __init__(self): + #: The data included on the ping. + self.ping_data = None + + def __repr__(self): + return "<PingReceived ping_data:%s>" % ( + _bytes_representation(self.ping_data), + ) + + +class PingAcknowledged(Event): + """ + Same as PingAckReceived. + + .. deprecated:: 3.1.0 + """ + def __init__(self): + #: The data included on the ping. + self.ping_data = None + + def __repr__(self): + return "<PingAckReceived ping_data:%s>" % ( + _bytes_representation(self.ping_data), + ) + + +class PingAckReceived(PingAcknowledged): + """ + The PingAckReceived event is fired whenever a PING acknowledgment is + received. It contains the 'opaque data' of the PING+ACK frame, allowing the + user to correlate PINGs and calculate RTT. + + .. versionadded:: 3.1.0 + """ + pass + + +class StreamEnded(Event): + """ + The StreamEnded event is fired whenever a stream is ended by a remote + party. The stream may not be fully closed if it has not been closed + locally, but no further data or headers should be expected on that stream. + """ + def __init__(self): + #: The Stream ID of the stream that was closed. + self.stream_id = None + + def __repr__(self): + return "<StreamEnded stream_id:%s>" % self.stream_id + + +class StreamReset(Event): + """ + The StreamReset event is fired in two situations. The first is when the + remote party forcefully resets the stream. The second is when the remote + party has made a protocol error which only affects a single stream. In this + case, Hyper-h2 will terminate the stream early and return this event. + + .. versionchanged:: 2.0.0 + This event is now fired when Hyper-h2 automatically resets a stream. + """ + def __init__(self): + #: The Stream ID of the stream that was reset. + self.stream_id = None + + #: The error code given. Either one of :class:`ErrorCodes + #: <h2.errors.ErrorCodes>` or ``int`` + self.error_code = None + + #: Whether the remote peer sent a RST_STREAM or we did. + self.remote_reset = True + + def __repr__(self): + return "<StreamReset stream_id:%s, error_code:%s, remote_reset:%s>" % ( + self.stream_id, self.error_code, self.remote_reset + ) + + +class PushedStreamReceived(Event): + """ + The PushedStreamReceived event is fired whenever a pushed stream has been + received from a remote peer. The event carries on it the new stream ID, the + ID of the parent stream, and the request headers pushed by the remote peer. + """ + def __init__(self): + #: The Stream ID of the stream created by the push. + self.pushed_stream_id = None + + #: The Stream ID of the stream that the push is related to. + self.parent_stream_id = None + + #: The request headers, sent by the remote party in the push. + self.headers = None + + def __repr__(self): + return ( + "<PushedStreamReceived pushed_stream_id:%s, parent_stream_id:%s, " + "headers:%s>" % ( + self.pushed_stream_id, + self.parent_stream_id, + self.headers, + ) + ) + + +class SettingsAcknowledged(Event): + """ + The SettingsAcknowledged event is fired whenever a settings ACK is received + from the remote peer. The event carries on it the settings that were + acknowedged, in the same format as + :class:`h2.events.RemoteSettingsChanged`. + """ + def __init__(self): + #: A dictionary of setting byte to + #: :class:`ChangedSetting <h2.settings.ChangedSetting>`, representing + #: the changed settings. + self.changed_settings = {} + + def __repr__(self): + return "<SettingsAcknowledged changed_settings:{%s}>" % ( + ", ".join(repr(cs) for cs in self.changed_settings.values()), + ) + + +class PriorityUpdated(Event): + """ + The PriorityUpdated event is fired whenever a stream sends updated priority + information. This can occur when the stream is opened, or at any time + during the stream lifetime. + + This event is purely advisory, and does not need to be acted on. + + .. versionadded:: 2.0.0 + """ + def __init__(self): + #: The ID of the stream whose priority information is being updated. + self.stream_id = None + + #: The new stream weight. May be the same as the original stream + #: weight. An integer between 1 and 256. + self.weight = None + + #: The stream ID this stream now depends on. May be ``0``. + self.depends_on = None + + #: Whether the stream *exclusively* depends on the parent stream. If it + #: does, this stream should inherit the current children of its new + #: parent. + self.exclusive = None + + def __repr__(self): + return ( + "<PriorityUpdated stream_id:%s, weight:%s, depends_on:%s, " + "exclusive:%s>" % ( + self.stream_id, + self.weight, + self.depends_on, + self.exclusive + ) + ) + + +class ConnectionTerminated(Event): + """ + The ConnectionTerminated event is fired when a connection is torn down by + the remote peer using a GOAWAY frame. Once received, no further action may + be taken on the connection: a new connection must be established. + """ + def __init__(self): + #: The error code cited when tearing down the connection. Should be + #: one of :class:`ErrorCodes <h2.errors.ErrorCodes>`, but may not be if + #: unknown HTTP/2 extensions are being used. + self.error_code = None + + #: The stream ID of the last stream the remote peer saw. This can + #: provide an indication of what data, if any, never reached the remote + #: peer and so can safely be resent. + self.last_stream_id = None + + #: Additional debug data that can be appended to GOAWAY frame. + self.additional_data = None + + def __repr__(self): + return ( + "<ConnectionTerminated error_code:%s, last_stream_id:%s, " + "additional_data:%s>" % ( + self.error_code, + self.last_stream_id, + _bytes_representation( + self.additional_data[:20] + if self.additional_data else None) + ) + ) + + +class AlternativeServiceAvailable(Event): + """ + The AlternativeServiceAvailable event is fired when the remote peer + advertises an `RFC 7838 <https://tools.ietf.org/html/rfc7838>`_ Alternative + Service using an ALTSVC frame. + + This event always carries the origin to which the ALTSVC information + applies. That origin is either supplied by the server directly, or inferred + by hyper-h2 from the ``:authority`` pseudo-header field that was sent by + the user when initiating a given stream. + + This event also carries what RFC 7838 calls the "Alternative Service Field + Value", which is formatted like a HTTP header field and contains the + relevant alternative service information. Hyper-h2 does not parse or in any + way modify that information: the user is required to do that. + + This event can only be fired on the client end of a connection. + + .. versionadded:: 2.3.0 + """ + def __init__(self): + #: The origin to which the alternative service field value applies. + #: This field is either supplied by the server directly, or inferred by + #: hyper-h2 from the ``:authority`` pseudo-header field that was sent + #: by the user when initiating the stream on which the frame was + #: received. + self.origin = None + + #: The ALTSVC field value. This contains information about the HTTP + #: alternative service being advertised by the server. Hyper-h2 does + #: not parse this field: it is left exactly as sent by the server. The + #: structure of the data in this field is given by `RFC 7838 Section 3 + #: <https://tools.ietf.org/html/rfc7838#section-3>`_. + self.field_value = None + + def __repr__(self): + return ( + "<AlternativeServiceAvailable origin:%s, field_value:%s>" % ( + self.origin.decode('utf-8', 'ignore'), + self.field_value.decode('utf-8', 'ignore'), + ) + ) + + +class UnknownFrameReceived(Event): + """ + The UnknownFrameReceived event is fired when the remote peer sends a frame + that hyper-h2 does not understand. This occurs primarily when the remote + peer is employing HTTP/2 extensions that hyper-h2 doesn't know anything + about. + + RFC 7540 requires that HTTP/2 implementations ignore these frames. hyper-h2 + does so. However, this event is fired to allow implementations to perform + special processing on those frames if needed (e.g. if the implementation + is capable of handling the frame itself). + + .. versionadded:: 2.7.0 + """ + def __init__(self): + #: The hyperframe Frame object that encapsulates the received frame. + self.frame = None + + def __repr__(self): + return "<UnknownFrameReceived>" + + +def _bytes_representation(data): + """ + Converts a bytestring into something that is safe to print on all Python + platforms. + + This function is relatively expensive, so it should not be called on the + mainline of the code. It's safe to use in things like object repr methods + though. + """ + if data is None: + return None + + hex = binascii.hexlify(data) + + # This is moderately clever: on all Python versions hexlify returns a byte + # string. On Python 3 we want an actual string, so we just check whether + # that's what we have. + if not isinstance(hex, str): # pragma: no cover + hex = hex.decode('ascii') + + return hex diff --git a/src/h2/exceptions.py b/src/h2/exceptions.py new file mode 100644 index 0000000..388f9e9 --- /dev/null +++ b/src/h2/exceptions.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +""" +h2/exceptions +~~~~~~~~~~~~~ + +Exceptions for the HTTP/2 module. +""" +import h2.errors + + +class H2Error(Exception): + """ + The base class for all exceptions for the HTTP/2 module. + """ + + +class ProtocolError(H2Error): + """ + An action was attempted in violation of the HTTP/2 protocol. + """ + #: The error code corresponds to this kind of Protocol Error. + error_code = h2.errors.ErrorCodes.PROTOCOL_ERROR + + +class FrameTooLargeError(ProtocolError): + """ + The frame that we tried to send or that we received was too large. + """ + #: This error code that corresponds to this kind of Protocol Error. + error_code = h2.errors.ErrorCodes.FRAME_SIZE_ERROR + + +class FrameDataMissingError(ProtocolError): + """ + The frame that we received is missing some data. + + .. versionadded:: 2.0.0 + """ + #: The error code that corresponds to this kind of Protocol Error + error_code = h2.errors.ErrorCodes.FRAME_SIZE_ERROR + + +class TooManyStreamsError(ProtocolError): + """ + An attempt was made to open a stream that would lead to too many concurrent + streams. + """ + pass + + +class FlowControlError(ProtocolError): + """ + An attempted action violates flow control constraints. + """ + #: The error code that corresponds to this kind of + #: :class:`ProtocolError <h2.exceptions.ProtocolError>` + error_code = h2.errors.ErrorCodes.FLOW_CONTROL_ERROR + + +class StreamIDTooLowError(ProtocolError): + """ + An attempt was made to open a stream that had an ID that is lower than the + highest ID we have seen on this connection. + """ + def __init__(self, stream_id, max_stream_id): + #: The ID of the stream that we attempted to open. + self.stream_id = stream_id + + #: The current highest-seen stream ID. + self.max_stream_id = max_stream_id + + def __str__(self): + return "StreamIDTooLowError: %d is lower than %d" % ( + self.stream_id, self.max_stream_id + ) + + +class NoAvailableStreamIDError(ProtocolError): + """ + There are no available stream IDs left to the connection. All stream IDs + have been exhausted. + + .. versionadded:: 2.0.0 + """ + pass + + +class NoSuchStreamError(ProtocolError): + """ + A stream-specific action referenced a stream that does not exist. + + .. versionchanged:: 2.0.0 + Became a subclass of :class:`ProtocolError + <h2.exceptions.ProtocolError>` + """ + def __init__(self, stream_id): + #: The stream ID that corresponds to the non-existent stream. + self.stream_id = stream_id + + +class StreamClosedError(NoSuchStreamError): + """ + A more specific form of + :class:`NoSuchStreamError <h2.exceptions.NoSuchStreamError>`. Indicates + that the stream has since been closed, and that all state relating to that + stream has been removed. + """ + def __init__(self, stream_id): + #: The stream ID that corresponds to the nonexistent stream. + self.stream_id = stream_id + + #: The relevant HTTP/2 error code. + self.error_code = h2.errors.ErrorCodes.STREAM_CLOSED + + # Any events that internal code may need to fire. Not relevant to + # external users that may receive a StreamClosedError. + self._events = [] + + +class InvalidSettingsValueError(ProtocolError, ValueError): + """ + An attempt was made to set an invalid Settings value. + + .. versionadded:: 2.0.0 + """ + def __init__(self, msg, error_code): + super(InvalidSettingsValueError, self).__init__(msg) + self.error_code = error_code + + +class InvalidBodyLengthError(ProtocolError): + """ + The remote peer sent more or less data that the Content-Length header + indicated. + + .. versionadded:: 2.0.0 + """ + def __init__(self, expected, actual): + self.expected_length = expected + self.actual_length = actual + + def __str__(self): + return "InvalidBodyLengthError: Expected %d bytes, received %d" % ( + self.expected_length, self.actual_length + ) + + +class UnsupportedFrameError(ProtocolError, KeyError): + """ + The remote peer sent a frame that is unsupported in this context. + + .. versionadded:: 2.1.0 + """ + # TODO: Remove the KeyError in 3.0.0 + pass + + +class RFC1122Error(H2Error): + """ + Emitted when users attempt to do something that is literally allowed by the + relevant RFC, but is sufficiently ill-defined that it's unwise to allow + users to actually do it. + + While there is some disagreement about whether or not we should be liberal + in what accept, it is a truth universally acknowledged that we should be + conservative in what emit. + + .. versionadded:: 2.4.0 + """ + # shazow says I'm going to regret naming the exception this way. If that + # turns out to be true, TELL HIM NOTHING. + pass + + +class DenialOfServiceError(ProtocolError): + """ + Emitted when the remote peer exhibits a behaviour that is likely to be an + attempt to perform a Denial of Service attack on the implementation. This + is a form of ProtocolError that carries a different error code, and allows + more easy detection of this kind of behaviour. + + .. versionadded:: 2.5.0 + """ + #: The error code that corresponds to this kind of + #: :class:`ProtocolError <h2.exceptions.ProtocolError>` + error_code = h2.errors.ErrorCodes.ENHANCE_YOUR_CALM diff --git a/src/h2/frame_buffer.py b/src/h2/frame_buffer.py new file mode 100644 index 0000000..e79f6ec --- /dev/null +++ b/src/h2/frame_buffer.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +""" +h2/frame_buffer +~~~~~~~~~~~~~~~ + +A data structure that provides a way to iterate over a byte buffer in terms of +frames. +""" +from hyperframe.exceptions import InvalidFrameError +from hyperframe.frame import ( + Frame, HeadersFrame, ContinuationFrame, PushPromiseFrame +) + +from .exceptions import ( + ProtocolError, FrameTooLargeError, FrameDataMissingError +) + +# To avoid a DOS attack based on sending loads of continuation frames, we limit +# the maximum number we're perpared to receive. In this case, we'll set the +# limit to 64, which means the largest encoded header block we can receive by +# default is 262144 bytes long, and the largest possible *at all* is 1073741760 +# bytes long. +# +# This value seems reasonable for now, but in future we may want to evaluate +# making it configurable. +CONTINUATION_BACKLOG = 64 + + +class FrameBuffer(object): + """ + This is a data structure that expects to act as a buffer for HTTP/2 data + that allows iteraton in terms of H2 frames. + """ + def __init__(self, server=False): + self.data = b'' + self.max_frame_size = 0 + self._preamble = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n' if server else b'' + self._preamble_len = len(self._preamble) + self._headers_buffer = [] + + def add_data(self, data): + """ + Add more data to the frame buffer. + + :param data: A bytestring containing the byte buffer. + """ + if self._preamble_len: + data_len = len(data) + of_which_preamble = min(self._preamble_len, data_len) + + if self._preamble[:of_which_preamble] != data[:of_which_preamble]: + raise ProtocolError("Invalid HTTP/2 preamble.") + + data = data[of_which_preamble:] + self._preamble_len -= of_which_preamble + self._preamble = self._preamble[of_which_preamble:] + + self.data += data + + def _parse_frame_header(self, data): + """ + Parses the frame header from the data. Either returns a tuple of + (frame, length), or throws an exception. The returned frame may be None + if the frame is of unknown type. + """ + try: + frame, length = Frame.parse_frame_header(data[:9]) + except ValueError as e: + # The frame header is invalid. This is a ProtocolError + raise ProtocolError("Invalid frame header received: %s" % str(e)) + + return frame, length + + def _validate_frame_length(self, length): + """ + Confirm that the frame is an appropriate length. + """ + if length > self.max_frame_size: + raise FrameTooLargeError( + "Received overlong frame: length %d, max %d" % + (length, self.max_frame_size) + ) + + def _update_header_buffer(self, f): + """ + Updates the internal header buffer. Returns a frame that should replace + the current one. May throw exceptions if this frame is invalid. + """ + # Check if we're in the middle of a headers block. If we are, this + # frame *must* be a CONTINUATION frame with the same stream ID as the + # leading HEADERS or PUSH_PROMISE frame. Anything else is a + # ProtocolError. If the frame *is* valid, append it to the header + # buffer. + if self._headers_buffer: + stream_id = self._headers_buffer[0].stream_id + valid_frame = ( + f is not None and + isinstance(f, ContinuationFrame) and + f.stream_id == stream_id + ) + if not valid_frame: + raise ProtocolError("Invalid frame during header block.") + + # Append the frame to the buffer. + self._headers_buffer.append(f) + if len(self._headers_buffer) > CONTINUATION_BACKLOG: + raise ProtocolError("Too many continuation frames received.") + + # If this is the end of the header block, then we want to build a + # mutant HEADERS frame that's massive. Use the original one we got, + # then set END_HEADERS and set its data appopriately. If it's not + # the end of the block, lose the current frame: we can't yield it. + if 'END_HEADERS' in f.flags: + f = self._headers_buffer[0] + f.flags.add('END_HEADERS') + f.data = b''.join(x.data for x in self._headers_buffer) + self._headers_buffer = [] + else: + f = None + elif (isinstance(f, (HeadersFrame, PushPromiseFrame)) and + 'END_HEADERS' not in f.flags): + # This is the start of a headers block! Save the frame off and then + # act like we didn't receive one. + self._headers_buffer.append(f) + f = None + + return f + + # The methods below support the iterator protocol. + def __iter__(self): + return self + + def next(self): # Python 2 + # First, check that we have enough data to successfully parse the + # next frame header. If not, bail. Otherwise, parse it. + if len(self.data) < 9: + raise StopIteration() + + try: + f, length = self._parse_frame_header(self.data) + except InvalidFrameError: # pragma: no cover + raise ProtocolError("Received frame with invalid frame header.") + + # Next, check that we have enough length to parse the frame body. If + # not, bail, leaving the frame header data in the buffer for next time. + if len(self.data) < length + 9: + raise StopIteration() + + # Confirm the frame has an appropriate length. + self._validate_frame_length(length) + + # Don't try to parse the body if we didn't get a frame we know about: + # there's nothing we can do with it anyway. + if f is not None: + try: + f.parse_body(memoryview(self.data[9:9+length])) + except InvalidFrameError: + raise FrameDataMissingError("Frame data missing or invalid") + + # At this point, as we know we'll use or discard the entire frame, we + # can update the data. + self.data = self.data[9+length:] + + # Pass the frame through the header buffer. + f = self._update_header_buffer(f) + + # If we got a frame we didn't understand or shouldn't yield, rather + # than return None it'd be better if we just tried to get the next + # frame in the sequence instead. Recurse back into ourselves to do + # that. This is safe because the amount of work we have to do here is + # strictly bounded by the length of the buffer. + return f if f is not None else self.next() + + def __next__(self): # Python 3 + return self.next() diff --git a/src/h2/settings.py b/src/h2/settings.py new file mode 100644 index 0000000..bf87c94 --- /dev/null +++ b/src/h2/settings.py @@ -0,0 +1,339 @@ +# -*- coding: utf-8 -*- +""" +h2/settings +~~~~~~~~~~~ + +This module contains a HTTP/2 settings object. This object provides a simple +API for manipulating HTTP/2 settings, keeping track of both the current active +state of the settings and the unacknowledged future values of the settings. +""" +import collections +import enum + +from hyperframe.frame import SettingsFrame + +from h2.errors import ErrorCodes +from h2.exceptions import InvalidSettingsValueError + +try: + from collections.abc import MutableMapping +except ImportError: # pragma: no cover + # Python 2.7 compatibility + from collections import MutableMapping + + +class SettingCodes(enum.IntEnum): + """ + All known HTTP/2 setting codes. + + .. versionadded:: 2.6.0 + """ + + #: Allows the sender to inform the remote endpoint of the maximum size of + #: the header compression table used to decode header blocks, in octets. + HEADER_TABLE_SIZE = SettingsFrame.HEADER_TABLE_SIZE + + #: This setting can be used to disable server push. To disable server push + #: on a client, set this to 0. + ENABLE_PUSH = SettingsFrame.ENABLE_PUSH + + #: Indicates the maximum number of concurrent streams that the sender will + #: allow. + MAX_CONCURRENT_STREAMS = SettingsFrame.MAX_CONCURRENT_STREAMS + + #: Indicates the sender's initial window size (in octets) for stream-level + #: flow control. + INITIAL_WINDOW_SIZE = SettingsFrame.INITIAL_WINDOW_SIZE + + #: Indicates the size of the largest frame payload that the sender is + #: willing to receive, in octets. + MAX_FRAME_SIZE = SettingsFrame.MAX_FRAME_SIZE + + #: This advisory setting informs a peer of the maximum size of header list + #: that the sender is prepared to accept, in octets. The value is based on + #: the uncompressed size of header fields, including the length of the name + #: and value in octets plus an overhead of 32 octets for each header field. + MAX_HEADER_LIST_SIZE = SettingsFrame.MAX_HEADER_LIST_SIZE + + #: This setting can be used to enable the connect protocol. To enable on a + #: client set this to 1. + ENABLE_CONNECT_PROTOCOL = SettingsFrame.ENABLE_CONNECT_PROTOCOL + + +def _setting_code_from_int(code): + """ + Given an integer setting code, returns either one of :class:`SettingCodes + <h2.settings.SettingCodes>` or, if not present in the known set of codes, + returns the integer directly. + """ + try: + return SettingCodes(code) + except ValueError: + return code + + +class ChangedSetting: + + def __init__(self, setting, original_value, new_value): + #: The setting code given. Either one of :class:`SettingCodes + #: <h2.settings.SettingCodes>` or ``int`` + #: + #: .. versionchanged:: 2.6.0 + self.setting = setting + + #: The original value before being changed. + self.original_value = original_value + + #: The new value after being changed. + self.new_value = new_value + + def __repr__(self): + return ( + "ChangedSetting(setting=%s, original_value=%s, " + "new_value=%s)" + ) % ( + self.setting, + self.original_value, + self.new_value + ) + + +class Settings(MutableMapping): + """ + An object that encapsulates HTTP/2 settings state. + + HTTP/2 Settings are a complex beast. Each party, remote and local, has its + own settings and a view of the other party's settings. When a settings + frame is emitted by a peer it cannot assume that the new settings values + are in place until the remote peer acknowledges the setting. In principle, + multiple settings changes can be "in flight" at the same time, all with + different values. + + This object encapsulates this mess. It provides a dict-like interface to + settings, which return the *current* values of the settings in question. + Additionally, it keeps track of the stack of proposed values: each time an + acknowledgement is sent/received, it updates the current values with the + stack of proposed values. On top of all that, it validates the values to + make sure they're allowed, and raises :class:`InvalidSettingsValueError + <h2.exceptions.InvalidSettingsValueError>` if they are not. + + Finally, this object understands what the default values of the HTTP/2 + settings are, and sets those defaults appropriately. + + .. versionchanged:: 2.2.0 + Added the ``initial_values`` parameter. + + .. versionchanged:: 2.5.0 + Added the ``max_header_list_size`` property. + + :param client: (optional) Whether these settings should be defaulted for a + client implementation or a server implementation. Defaults to ``True``. + :type client: ``bool`` + :param initial_values: (optional) Any initial values the user would like + set, rather than RFC 7540's defaults. + :type initial_vales: ``MutableMapping`` + """ + def __init__(self, client=True, initial_values=None): + # Backing object for the settings. This is a dictionary of + # (setting: [list of values]), where the first value in the list is the + # current value of the setting. Strictly this doesn't use lists but + # instead uses collections.deque to avoid repeated memory allocations. + # + # This contains the default values for HTTP/2. + self._settings = { + SettingCodes.HEADER_TABLE_SIZE: collections.deque([4096]), + SettingCodes.ENABLE_PUSH: collections.deque([int(client)]), + SettingCodes.INITIAL_WINDOW_SIZE: collections.deque([65535]), + SettingCodes.MAX_FRAME_SIZE: collections.deque([16384]), + SettingCodes.ENABLE_CONNECT_PROTOCOL: collections.deque([0]), + } + if initial_values is not None: + for key, value in initial_values.items(): + invalid = _validate_setting(key, value) + if invalid: + raise InvalidSettingsValueError( + "Setting %d has invalid value %d" % (key, value), + error_code=invalid + ) + self._settings[key] = collections.deque([value]) + + def acknowledge(self): + """ + The settings have been acknowledged, either by the user (remote + settings) or by the remote peer (local settings). + + :returns: A dict of {setting: ChangedSetting} that were applied. + """ + changed_settings = {} + + # If there is more than one setting in the list, we have a setting + # value outstanding. Update them. + for k, v in self._settings.items(): + if len(v) > 1: + old_setting = v.popleft() + new_setting = v[0] + changed_settings[k] = ChangedSetting( + k, old_setting, new_setting + ) + + return changed_settings + + # Provide easy-access to well known settings. + @property + def header_table_size(self): + """ + The current value of the :data:`HEADER_TABLE_SIZE + <h2.settings.SettingCodes.HEADER_TABLE_SIZE>` setting. + """ + return self[SettingCodes.HEADER_TABLE_SIZE] + + @header_table_size.setter + def header_table_size(self, value): + self[SettingCodes.HEADER_TABLE_SIZE] = value + + @property + def enable_push(self): + """ + The current value of the :data:`ENABLE_PUSH + <h2.settings.SettingCodes.ENABLE_PUSH>` setting. + """ + return self[SettingCodes.ENABLE_PUSH] + + @enable_push.setter + def enable_push(self, value): + self[SettingCodes.ENABLE_PUSH] = value + + @property + def initial_window_size(self): + """ + The current value of the :data:`INITIAL_WINDOW_SIZE + <h2.settings.SettingCodes.INITIAL_WINDOW_SIZE>` setting. + """ + return self[SettingCodes.INITIAL_WINDOW_SIZE] + + @initial_window_size.setter + def initial_window_size(self, value): + self[SettingCodes.INITIAL_WINDOW_SIZE] = value + + @property + def max_frame_size(self): + """ + The current value of the :data:`MAX_FRAME_SIZE + <h2.settings.SettingCodes.MAX_FRAME_SIZE>` setting. + """ + return self[SettingCodes.MAX_FRAME_SIZE] + + @max_frame_size.setter + def max_frame_size(self, value): + self[SettingCodes.MAX_FRAME_SIZE] = value + + @property + def max_concurrent_streams(self): + """ + The current value of the :data:`MAX_CONCURRENT_STREAMS + <h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS>` setting. + """ + return self.get(SettingCodes.MAX_CONCURRENT_STREAMS, 2**32+1) + + @max_concurrent_streams.setter + def max_concurrent_streams(self, value): + self[SettingCodes.MAX_CONCURRENT_STREAMS] = value + + @property + def max_header_list_size(self): + """ + The current value of the :data:`MAX_HEADER_LIST_SIZE + <h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE>` setting. If not set, + returns ``None``, which means unlimited. + + .. versionadded:: 2.5.0 + """ + return self.get(SettingCodes.MAX_HEADER_LIST_SIZE, None) + + @max_header_list_size.setter + def max_header_list_size(self, value): + self[SettingCodes.MAX_HEADER_LIST_SIZE] = value + + @property + def enable_connect_protocol(self): + """ + The current value of the :data:`ENABLE_CONNECT_PROTOCOL + <h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL>` setting. + """ + return self[SettingCodes.ENABLE_CONNECT_PROTOCOL] + + @enable_connect_protocol.setter + def enable_connect_protocol(self, value): + self[SettingCodes.ENABLE_CONNECT_PROTOCOL] = value + + # Implement the MutableMapping API. + def __getitem__(self, key): + val = self._settings[key][0] + + # Things that were created when a setting was received should stay + # KeyError'd. + if val is None: + raise KeyError + + return val + + def __setitem__(self, key, value): + invalid = _validate_setting(key, value) + if invalid: + raise InvalidSettingsValueError( + "Setting %d has invalid value %d" % (key, value), + error_code=invalid + ) + + try: + items = self._settings[key] + except KeyError: + items = collections.deque([None]) + self._settings[key] = items + + items.append(value) + + def __delitem__(self, key): + del self._settings[key] + + def __iter__(self): + return self._settings.__iter__() + + def __len__(self): + return len(self._settings) + + def __eq__(self, other): + if isinstance(other, Settings): + return self._settings == other._settings + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, Settings): + return not self == other + else: + return NotImplemented + + +def _validate_setting(setting, value): # noqa: C901 + """ + Confirms that a specific setting has a well-formed value. If the setting is + invalid, returns an error code. Otherwise, returns 0 (NO_ERROR). + """ + if setting == SettingCodes.ENABLE_PUSH: + if value not in (0, 1): + return ErrorCodes.PROTOCOL_ERROR + elif setting == SettingCodes.INITIAL_WINDOW_SIZE: + if not 0 <= value <= 2147483647: # 2^31 - 1 + return ErrorCodes.FLOW_CONTROL_ERROR + elif setting == SettingCodes.MAX_FRAME_SIZE: + if not 16384 <= value <= 16777215: # 2^14 and 2^24 - 1 + return ErrorCodes.PROTOCOL_ERROR + elif setting == SettingCodes.MAX_HEADER_LIST_SIZE: + if value < 0: + return ErrorCodes.PROTOCOL_ERROR + elif setting == SettingCodes.ENABLE_CONNECT_PROTOCOL: + if value not in (0, 1): + return ErrorCodes.PROTOCOL_ERROR + + return 0 diff --git a/src/h2/stream.py b/src/h2/stream.py new file mode 100644 index 0000000..af3d1cc --- /dev/null +++ b/src/h2/stream.py @@ -0,0 +1,1371 @@ +# -*- coding: utf-8 -*- +""" +h2/stream +~~~~~~~~~ + +An implementation of a HTTP/2 stream. +""" +from enum import Enum, IntEnum +from hpack import HeaderTuple +from hyperframe.frame import ( + HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame, + RstStreamFrame, PushPromiseFrame, AltSvcFrame +) + +from .errors import ErrorCodes, _error_code_from_int +from .events import ( + RequestReceived, ResponseReceived, DataReceived, WindowUpdated, + StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived, + InformationalResponseReceived, AlternativeServiceAvailable, + _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent +) +from .exceptions import ( + ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError +) +from .utilities import ( + guard_increment_window, is_informational_response, authority_from_headers, + validate_headers, validate_outbound_headers, normalize_outbound_headers, + HeaderValidationFlags, extract_method_header, normalize_inbound_headers +) +from .windows import WindowManager + + +class StreamState(IntEnum): + IDLE = 0 + RESERVED_REMOTE = 1 + RESERVED_LOCAL = 2 + OPEN = 3 + HALF_CLOSED_REMOTE = 4 + HALF_CLOSED_LOCAL = 5 + CLOSED = 6 + + +class StreamInputs(Enum): + SEND_HEADERS = 0 + SEND_PUSH_PROMISE = 1 + SEND_RST_STREAM = 2 + SEND_DATA = 3 + SEND_WINDOW_UPDATE = 4 + SEND_END_STREAM = 5 + RECV_HEADERS = 6 + RECV_PUSH_PROMISE = 7 + RECV_RST_STREAM = 8 + RECV_DATA = 9 + RECV_WINDOW_UPDATE = 10 + RECV_END_STREAM = 11 + RECV_CONTINUATION = 12 # Added in 2.0.0 + SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0 + RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0 + SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0 + RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0 + UPGRADE_CLIENT = 17 # Added 2.3.0 + UPGRADE_SERVER = 18 # Added 2.3.0 + + +class StreamClosedBy(Enum): + SEND_END_STREAM = 0 + RECV_END_STREAM = 1 + SEND_RST_STREAM = 2 + RECV_RST_STREAM = 3 + + +# This array is initialized once, and is indexed by the stream states above. +# It indicates whether a stream in the given state is open. The reason we do +# this is that we potentially check whether a stream in a given state is open +# quite frequently: given that we check so often, we should do so in the +# fastest and most performant way possible. +STREAM_OPEN = [False for _ in range(0, len(StreamState))] +STREAM_OPEN[StreamState.OPEN] = True +STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True +STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True + + +class H2StreamStateMachine(object): + """ + A single HTTP/2 stream state machine. + + This stream object implements basically the state machine described in + RFC 7540 section 5.1. + + :param stream_id: The stream ID of this stream. This is stored primarily + for logging purposes. + """ + def __init__(self, stream_id): + self.state = StreamState.IDLE + self.stream_id = stream_id + + #: Whether this peer is the client side of this stream. + self.client = None + + # Whether trailers have been sent/received on this stream or not. + self.headers_sent = None + self.trailers_sent = None + self.headers_received = None + self.trailers_received = None + + # How the stream was closed. One of StreamClosedBy. + self.stream_closed_by = None + + def process_input(self, input_): + """ + Process a specific input in the state machine. + """ + if not isinstance(input_, StreamInputs): + raise ValueError("Input must be an instance of StreamInputs") + + try: + func, target_state = _transitions[(self.state, input_)] + except KeyError: + old_state = self.state + self.state = StreamState.CLOSED + raise ProtocolError( + "Invalid input %s in state %s" % (input_, old_state) + ) + else: + previous_state = self.state + self.state = target_state + if func is not None: + try: + return func(self, previous_state) + except ProtocolError: + self.state = StreamState.CLOSED + raise + except AssertionError as e: # pragma: no cover + self.state = StreamState.CLOSED + raise ProtocolError(e) + + return [] + + def request_sent(self, previous_state): + """ + Fires when a request is sent. + """ + self.client = True + self.headers_sent = True + event = _RequestSent() + + return [event] + + def response_sent(self, previous_state): + """ + Fires when something that should be a response is sent. This 'response' + may actually be trailers. + """ + if not self.headers_sent: + if self.client is True or self.client is None: + raise ProtocolError("Client cannot send responses.") + self.headers_sent = True + event = _ResponseSent() + else: + assert not self.trailers_sent + self.trailers_sent = True + event = _TrailersSent() + + return [event] + + def request_received(self, previous_state): + """ + Fires when a request is received. + """ + assert not self.headers_received + assert not self.trailers_received + + self.client = False + self.headers_received = True + event = RequestReceived() + + event.stream_id = self.stream_id + return [event] + + def response_received(self, previous_state): + """ + Fires when a response is received. Also disambiguates between responses + and trailers. + """ + if not self.headers_received: + assert self.client is True + self.headers_received = True + event = ResponseReceived() + else: + assert not self.trailers_received + self.trailers_received = True + event = TrailersReceived() + + event.stream_id = self.stream_id + return [event] + + def data_received(self, previous_state): + """ + Fires when data is received. + """ + if not self.headers_received: + raise ProtocolError("cannot receive data before headers") + event = DataReceived() + event.stream_id = self.stream_id + return [event] + + def window_updated(self, previous_state): + """ + Fires when a window update frame is received. + """ + event = WindowUpdated() + event.stream_id = self.stream_id + return [event] + + def stream_half_closed(self, previous_state): + """ + Fires when an END_STREAM flag is received in the OPEN state, + transitioning this stream to a HALF_CLOSED_REMOTE state. + """ + event = StreamEnded() + event.stream_id = self.stream_id + return [event] + + def stream_ended(self, previous_state): + """ + Fires when a stream is cleanly ended. + """ + self.stream_closed_by = StreamClosedBy.RECV_END_STREAM + event = StreamEnded() + event.stream_id = self.stream_id + return [event] + + def stream_reset(self, previous_state): + """ + Fired when a stream is forcefully reset. + """ + self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM + event = StreamReset() + event.stream_id = self.stream_id + return [event] + + def send_new_pushed_stream(self, previous_state): + """ + Fires on the newly pushed stream, when pushed by the local peer. + + No event here, but definitionally this peer must be a server. + """ + assert self.client is None + self.client = False + self.headers_received = True + return [] + + def recv_new_pushed_stream(self, previous_state): + """ + Fires on the newly pushed stream, when pushed by the remote peer. + + No event here, but definitionally this peer must be a client. + """ + assert self.client is None + self.client = True + self.headers_sent = True + return [] + + def send_push_promise(self, previous_state): + """ + Fires on the already-existing stream when a PUSH_PROMISE frame is sent. + We may only send PUSH_PROMISE frames if we're a server. + """ + if self.client is True: + raise ProtocolError("Cannot push streams from client peers.") + + event = _PushedRequestSent() + return [event] + + def recv_push_promise(self, previous_state): + """ + Fires on the already-existing stream when a PUSH_PROMISE frame is + received. We may only receive PUSH_PROMISE frames if we're a client. + + Fires a PushedStreamReceived event. + """ + if not self.client: + if self.client is None: # pragma: no cover + msg = "Idle streams cannot receive pushes" + else: # pragma: no cover + msg = "Cannot receive pushed streams as a server" + raise ProtocolError(msg) + + event = PushedStreamReceived() + event.parent_stream_id = self.stream_id + return [event] + + def send_end_stream(self, previous_state): + """ + Called when an attempt is made to send END_STREAM in the + HALF_CLOSED_REMOTE state. + """ + self.stream_closed_by = StreamClosedBy.SEND_END_STREAM + + def send_reset_stream(self, previous_state): + """ + Called when an attempt is made to send RST_STREAM in a non-closed + stream state. + """ + self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM + + def reset_stream_on_error(self, previous_state): + """ + Called when we need to forcefully emit another RST_STREAM frame on + behalf of the state machine. + + If this is the first time we've done this, we should also hang an event + off the StreamClosedError so that the user can be informed. We know + it's the first time we've done this if the stream is currently in a + state other than CLOSED. + """ + self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM + + error = StreamClosedError(self.stream_id) + + event = StreamReset() + event.stream_id = self.stream_id + event.error_code = ErrorCodes.STREAM_CLOSED + event.remote_reset = False + error._events = [event] + raise error + + def recv_on_closed_stream(self, previous_state): + """ + Called when an unexpected frame is received on an already-closed + stream. + + An endpoint that receives an unexpected frame should treat it as + a stream error or connection error with type STREAM_CLOSED, depending + on the specific frame. The error handling is done at a higher level: + this just raises the appropriate error. + """ + raise StreamClosedError(self.stream_id) + + def send_on_closed_stream(self, previous_state): + """ + Called when an attempt is made to send data on an already-closed + stream. + + This essentially overrides the standard logic by throwing a + more-specific error: StreamClosedError. This is a ProtocolError, so it + matches the standard API of the state machine, but provides more detail + to the user. + """ + raise StreamClosedError(self.stream_id) + + def recv_push_on_closed_stream(self, previous_state): + """ + Called when a PUSH_PROMISE frame is received on a full stop + stream. + + If the stream was closed by us sending a RST_STREAM frame, then we + presume that the PUSH_PROMISE was in flight when we reset the parent + stream. Rathen than accept the new stream, we just reset it. + Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a + naturally closed stream is a real problem because it creates a brand + new stream that the remote peer now believes exists. + """ + assert self.stream_closed_by is not None + + if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM: + raise StreamClosedError(self.stream_id) + else: + raise ProtocolError("Attempted to push on closed stream.") + + def send_push_on_closed_stream(self, previous_state): + """ + Called when an attempt is made to push on an already-closed stream. + + This essentially overrides the standard logic by providing a more + useful error message. It's necessary because simply indicating that the + stream is closed is not enough: there is now a new stream that is not + allowed to be there. The only recourse is to tear the whole connection + down. + """ + raise ProtocolError("Attempted to push on closed stream.") + + def send_informational_response(self, previous_state): + """ + Called when an informational header block is sent (that is, a block + where the :status header has a 1XX value). + + Only enforces that these are sent *before* final headers are sent. + """ + if self.headers_sent: + raise ProtocolError("Information response after final response") + + event = _ResponseSent() + return [event] + + def recv_informational_response(self, previous_state): + """ + Called when an informational header block is received (that is, a block + where the :status header has a 1XX value). + """ + if self.headers_received: + raise ProtocolError("Informational response after final response") + + event = InformationalResponseReceived() + event.stream_id = self.stream_id + return [event] + + def recv_alt_svc(self, previous_state): + """ + Called when receiving an ALTSVC frame. + + RFC 7838 allows us to receive ALTSVC frames at any stream state, which + is really absurdly overzealous. For that reason, we want to limit the + states in which we can actually receive it. It's really only sensible + to receive it after we've sent our own headers and before the server + has sent its header block: the server can't guarantee that we have any + state around after it completes its header block, and the server + doesn't know what origin we're talking about before we've sent ours. + + For that reason, this function applies a few extra checks on both state + and some of the little state variables we keep around. If those suggest + an unreasonable situation for the ALTSVC frame to have been sent in, + we quietly ignore it (as RFC 7838 suggests). + + This function is also *not* always called by the state machine. In some + states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it, + because we know the frame cannot be valid in that state (IDLE because + the server cannot know what origin the stream applies to, CLOSED + because the server cannot assume we still have state around, + RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL + state then *we* are the server). + """ + # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore + # them. + if self.client is False: + return [] + + # If we've received the response headers from the server they can't + # guarantee we still have any state around. Other implementations + # (like nghttp2) ignore ALTSVC in this state, so we will too. + if self.headers_received: + return [] + + # Otherwise, this is a sensible enough frame to have received. Return + # the event and let it get populated. + return [AlternativeServiceAvailable()] + + def send_alt_svc(self, previous_state): + """ + Called when sending an ALTSVC frame on this stream. + + For consistency with the restrictions we apply on receiving ALTSVC + frames in ``recv_alt_svc``, we want to restrict when users can send + ALTSVC frames to the situations when we ourselves would accept them. + + That means: when we are a server, when we have received the request + headers, and when we have not yet sent our own response headers. + """ + # We should not send ALTSVC after we've sent response headers, as the + # client may have disposed of its state. + if self.headers_sent: + raise ProtocolError( + "Cannot send ALTSVC after sending response headers." + ) + + return + + +# STATE MACHINE +# +# The stream state machine is defined here to avoid the need to allocate it +# repeatedly for each stream. It cannot be defined in the stream class because +# it needs to be able to reference the callbacks defined on the class, but +# because Python's scoping rules are weird the class object is not actually in +# scope during the body of the class object. +# +# For the sake of clarity, we reproduce the RFC 7540 state machine here: +# +# +--------+ +# send PP | | recv PP +# ,--------| idle |--------. +# / | | \ +# v +--------+ v +# +----------+ | +----------+ +# | | | send H / | | +# ,------| reserved | | recv H | reserved |------. +# | | (local) | | | (remote) | | +# | +----------+ v +----------+ | +# | | +--------+ | | +# | | recv ES | | send ES | | +# | send H | ,-------| open |-------. | recv H | +# | | / | | \ | | +# | v v +--------+ v v | +# | +----------+ | +----------+ | +# | | half | | | half | | +# | | closed | | send R / | closed | | +# | | (remote) | | recv R | (local) | | +# | +----------+ | +----------+ | +# | | | | | +# | | send ES / | recv ES / | | +# | | send R / v send R / | | +# | | recv R +--------+ recv R | | +# | send R / `----------->| |<-----------' send R / | +# | recv R | closed | recv R | +# `----------------------->| |<----------------------' +# +--------+ +# +# send: endpoint sends this frame +# recv: endpoint receives this frame +# +# H: HEADERS frame (with implied CONTINUATIONs) +# PP: PUSH_PROMISE frame (with implied CONTINUATIONs) +# ES: END_STREAM flag +# R: RST_STREAM frame +# +# For the purposes of this state machine we treat HEADERS and their +# associated CONTINUATION frames as a single jumbo frame. The protocol +# allows/requires this by preventing other frames from being interleved in +# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is +# received without a prior HEADERS frame, it *will* be passed to this state +# machine. The state machine should always reject that frame, either as an +# invalid transition or because the stream is closed. +# +# There is a confusing relationship around PUSH_PROMISE frames. The state +# machine above considers them to be frames belonging to the new stream, +# which is *somewhat* true. However, they are sent with the stream ID of +# their related stream, and are only sendable in some cases. +# For this reason, our state machine implementation below allows for +# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also +# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states. +# Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on +# two streams. +# +# The _transitions dictionary contains a mapping of tuples of +# (state, input) to tuples of (side_effect_function, end_state). This +# map contains all allowed transitions: anything not in this map is +# invalid and immediately causes a transition to ``closed``. +_transitions = { + # State: idle + (StreamState.IDLE, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.request_sent, StreamState.OPEN), + (StreamState.IDLE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.request_received, StreamState.OPEN), + (StreamState.IDLE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_new_pushed_stream, + StreamState.RESERVED_LOCAL), + (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_new_pushed_stream, + StreamState.RESERVED_REMOTE), + (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.IDLE), + (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT): + (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL), + (StreamState.IDLE, StreamInputs.UPGRADE_SERVER): + (H2StreamStateMachine.request_received, + StreamState.HALF_CLOSED_REMOTE), + + # State: reserved local + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.RESERVED_LOCAL), + + # State: reserved remote + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.RESERVED_REMOTE), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE), + (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE), + + # State: open + (StreamState.OPEN, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_DATA): + (None, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_DATA): + (H2StreamStateMachine.data_received, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_END_STREAM): + (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.OPEN, StreamInputs.RECV_END_STREAM): + (H2StreamStateMachine.stream_half_closed, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_promise, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_promise, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.send_informational_response, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.recv_informational_response, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN), + + # State: half-closed remote + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA): + (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): + (H2StreamStateMachine.send_end_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_promise, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.send_informational_response, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE), + + # State: half-closed local + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA): + (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): + (H2StreamStateMachine.stream_ended, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_promise, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.recv_informational_response, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL), + + # State: closed + (StreamState.CLOSED, StreamInputs.RECV_END_STREAM): + (None, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.CLOSED), + + # RFC 7540 Section 5.1 defines how the end point should react when + # receiving a frame on a closed stream with the following statements: + # + # > An endpoint that receives any frame other than PRIORITY after receiving + # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED. + # > An endpoint that receives any frames after receiving a frame with the + # > END_STREAM flag set MUST treat that as a connection error of type + # > STREAM_CLOSED. + (StreamState.CLOSED, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_DATA): + (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), + + # > WINDOW_UPDATE or RST_STREAM frames can be received in this state + # > for a short period after a DATA or HEADERS frame containing a + # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we + # > don't have access to a clock so we just always allow it. + (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE): + (None, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM): + (None, StreamState.CLOSED), + + # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is + # > neither "open" nor "half-closed (local)" as a connection error of type + # > PROTOCOL_ERROR. + (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED), + + # Also, users should be forbidden from sending on closed streams. + (StreamState.CLOSED, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_DATA): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_END_STREAM): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), +} + + +class H2Stream(object): + """ + A low-level HTTP/2 stream object. This handles building and receiving + frames and maintains per-stream state. + + This wraps a HTTP/2 Stream state machine implementation, ensuring that + frames can only be sent/received when the stream is in a valid state. + Attempts to create frames that cannot be sent will raise a + ``ProtocolError``. + """ + def __init__(self, + stream_id, + config, + inbound_window_size, + outbound_window_size): + self.state_machine = H2StreamStateMachine(stream_id) + self.stream_id = stream_id + self.max_outbound_frame_size = None + self.request_method = None + + # The current value of the outbound stream flow control window + self.outbound_flow_control_window = outbound_window_size + + # The flow control manager. + self._inbound_window_manager = WindowManager(inbound_window_size) + + # The expected content length, if any. + self._expected_content_length = None + + # The actual received content length. Always tracked. + self._actual_content_length = 0 + + # The authority we believe this stream belongs to. + self._authority = None + + # The configuration for this stream. + self.config = config + + def __repr__(self): + return "<%s id:%d state:%r>" % ( + type(self).__name__, + self.stream_id, + self.state_machine.state + ) + + @property + def inbound_flow_control_window(self): + """ + The size of the inbound flow control window for the stream. This is + rarely publicly useful: instead, use :meth:`remote_flow_control_window + <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is + largely present to provide a shortcut to this data. + """ + return self._inbound_window_manager.current_window_size + + @property + def open(self): + """ + Whether the stream is 'open' in any sense: that is, whether it counts + against the number of concurrent streams. + """ + # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either + # the OPEN state or either of the HALF_CLOSED states. Perplexingly, + # this excludes the reserved states. + # For more detail on why we're doing this in this slightly weird way, + # see the comment on ``STREAM_OPEN`` at the top of the file. + return STREAM_OPEN[self.state_machine.state] + + @property + def closed(self): + """ + Whether the stream is closed. + """ + return self.state_machine.state == StreamState.CLOSED + + @property + def closed_by(self): + """ + Returns how the stream was closed, as one of StreamClosedBy. + """ + return self.state_machine.stream_closed_by + + def upgrade(self, client_side): + """ + Called by the connection to indicate that this stream is the initial + request/response of an upgraded connection. Places the stream into an + appropriate state. + """ + self.config.logger.debug("Upgrading %r", self) + + assert self.stream_id == 1 + input_ = ( + StreamInputs.UPGRADE_CLIENT if client_side + else StreamInputs.UPGRADE_SERVER + ) + + # This may return events, we deliberately don't want them. + self.state_machine.process_input(input_) + return + + def send_headers(self, headers, encoder, end_stream=False): + """ + Returns a list of HEADERS/CONTINUATION frames to emit as either headers + or trailers. + """ + self.config.logger.debug("Send headers %s on %r", headers, self) + + # Because encoding headers makes an irreversible change to the header + # compression context, we make the state transition before we encode + # them. + + # First, check if we're a client. If we are, no problem: if we aren't, + # we need to scan the header block to see if this is an informational + # response. + input_ = StreamInputs.SEND_HEADERS + if ((not self.state_machine.client) and + is_informational_response(headers)): + if end_stream: + raise ProtocolError( + "Cannot set END_STREAM on informational responses." + ) + + input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS + + events = self.state_machine.process_input(input_) + + hf = HeadersFrame(self.stream_id) + hdr_validation_flags = self._build_hdr_validation_flags(events) + frames = self._build_headers_frames( + headers, encoder, hf, hdr_validation_flags + ) + + if end_stream: + # Not a bug: the END_STREAM flag is valid on the initial HEADERS + # frame, not the CONTINUATION frames that follow. + self.state_machine.process_input(StreamInputs.SEND_END_STREAM) + frames[0].flags.add('END_STREAM') + + if self.state_machine.trailers_sent and not end_stream: + raise ProtocolError("Trailers must have END_STREAM set.") + + if self.state_machine.client and self._authority is None: + self._authority = authority_from_headers(headers) + + # store request method for _initialize_content_length + self.request_method = extract_method_header(headers) + + return frames + + def push_stream_in_band(self, related_stream_id, headers, encoder): + """ + Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed + stream header. Called on the stream that has the PUSH_PROMISE frame + sent on it. + """ + self.config.logger.debug("Push stream %r", self) + + # Because encoding headers makes an irreversible change to the header + # compression context, we make the state transition *first*. + + events = self.state_machine.process_input( + StreamInputs.SEND_PUSH_PROMISE + ) + + ppf = PushPromiseFrame(self.stream_id) + ppf.promised_stream_id = related_stream_id + hdr_validation_flags = self._build_hdr_validation_flags(events) + frames = self._build_headers_frames( + headers, encoder, ppf, hdr_validation_flags + ) + + return frames + + def locally_pushed(self): + """ + Mark this stream as one that was pushed by this peer. Must be called + immediately after initialization. Sends no frames, simply updates the + state machine. + """ + # This does not trigger any events. + events = self.state_machine.process_input( + StreamInputs.SEND_PUSH_PROMISE + ) + assert not events + return [] + + def send_data(self, data, end_stream=False, pad_length=None): + """ + Prepare some data frames. Optionally end the stream. + + .. warning:: Does not perform flow control checks. + """ + self.config.logger.debug( + "Send data on %r with end stream set to %s", self, end_stream + ) + + self.state_machine.process_input(StreamInputs.SEND_DATA) + + df = DataFrame(self.stream_id) + df.data = data + if end_stream: + self.state_machine.process_input(StreamInputs.SEND_END_STREAM) + df.flags.add('END_STREAM') + if pad_length is not None: + df.flags.add('PADDED') + df.pad_length = pad_length + + # Subtract flow_controlled_length to account for possible padding + self.outbound_flow_control_window -= df.flow_controlled_length + assert self.outbound_flow_control_window >= 0 + + return [df] + + def end_stream(self): + """ + End a stream without sending data. + """ + self.config.logger.debug("End stream %r", self) + + self.state_machine.process_input(StreamInputs.SEND_END_STREAM) + df = DataFrame(self.stream_id) + df.flags.add('END_STREAM') + return [df] + + def advertise_alternative_service(self, field_value): + """ + Advertise an RFC 7838 alternative service. The semantics of this are + better documented in the ``H2Connection`` class. + """ + self.config.logger.debug( + "Advertise alternative service of %r for %r", field_value, self + ) + self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE) + asf = AltSvcFrame(self.stream_id) + asf.field = field_value + return [asf] + + def increase_flow_control_window(self, increment): + """ + Increase the size of the flow control window for the remote side. + """ + self.config.logger.debug( + "Increase flow control window for %r by %d", + self, increment + ) + self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE) + self._inbound_window_manager.window_opened(increment) + + wuf = WindowUpdateFrame(self.stream_id) + wuf.window_increment = increment + return [wuf] + + def receive_push_promise_in_band(self, + promised_stream_id, + headers, + header_encoding): + """ + Receives a push promise frame sent on this stream, pushing a remote + stream. This is called on the stream that has the PUSH_PROMISE sent + on it. + """ + self.config.logger.debug( + "Receive Push Promise on %r for remote stream %d", + self, promised_stream_id + ) + events = self.state_machine.process_input( + StreamInputs.RECV_PUSH_PROMISE + ) + events[0].pushed_stream_id = promised_stream_id + + hdr_validation_flags = self._build_hdr_validation_flags(events) + events[0].headers = self._process_received_headers( + headers, hdr_validation_flags, header_encoding + ) + return [], events + + def remotely_pushed(self, pushed_headers): + """ + Mark this stream as one that was pushed by the remote peer. Must be + called immediately after initialization. Sends no frames, simply + updates the state machine. + """ + self.config.logger.debug("%r pushed by remote peer", self) + events = self.state_machine.process_input( + StreamInputs.RECV_PUSH_PROMISE + ) + self._authority = authority_from_headers(pushed_headers) + return [], events + + def receive_headers(self, headers, end_stream, header_encoding): + """ + Receive a set of headers (or trailers). + """ + if is_informational_response(headers): + if end_stream: + raise ProtocolError( + "Cannot set END_STREAM on informational responses" + ) + input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS + else: + input_ = StreamInputs.RECV_HEADERS + + events = self.state_machine.process_input(input_) + + if end_stream: + es_events = self.state_machine.process_input( + StreamInputs.RECV_END_STREAM + ) + events[0].stream_ended = es_events[0] + events += es_events + + self._initialize_content_length(headers) + + if isinstance(events[0], TrailersReceived): + if not end_stream: + raise ProtocolError("Trailers must have END_STREAM set") + + hdr_validation_flags = self._build_hdr_validation_flags(events) + events[0].headers = self._process_received_headers( + headers, hdr_validation_flags, header_encoding + ) + return [], events + + def receive_data(self, data, end_stream, flow_control_len): + """ + Receive some data. + """ + self.config.logger.debug( + "Receive data on %r with end stream %s and flow control length " + "set to %d", self, end_stream, flow_control_len + ) + events = self.state_machine.process_input(StreamInputs.RECV_DATA) + self._inbound_window_manager.window_consumed(flow_control_len) + self._track_content_length(len(data), end_stream) + + if end_stream: + es_events = self.state_machine.process_input( + StreamInputs.RECV_END_STREAM + ) + events[0].stream_ended = es_events[0] + events.extend(es_events) + + events[0].data = data + events[0].flow_controlled_length = flow_control_len + return [], events + + def receive_window_update(self, increment): + """ + Handle a WINDOW_UPDATE increment. + """ + self.config.logger.debug( + "Receive Window Update on %r for increment of %d", + self, increment + ) + events = self.state_machine.process_input( + StreamInputs.RECV_WINDOW_UPDATE + ) + frames = [] + + # If we encounter a problem with incrementing the flow control window, + # this should be treated as a *stream* error, not a *connection* error. + # That means we need to catch the error and forcibly close the stream. + if events: + events[0].delta = increment + try: + self.outbound_flow_control_window = guard_increment_window( + self.outbound_flow_control_window, + increment + ) + except FlowControlError: + # Ok, this is bad. We're going to need to perform a local + # reset. + event = StreamReset() + event.stream_id = self.stream_id + event.error_code = ErrorCodes.FLOW_CONTROL_ERROR + event.remote_reset = False + + events = [event] + frames = self.reset_stream(event.error_code) + + return frames, events + + def receive_continuation(self): + """ + A naked CONTINUATION frame has been received. This is always an error, + but the type of error it is depends on the state of the stream and must + transition the state of the stream, so we need to handle it. + """ + self.config.logger.debug("Receive Continuation frame on %r", self) + self.state_machine.process_input( + StreamInputs.RECV_CONTINUATION + ) + assert False, "Should not be reachable" + + def receive_alt_svc(self, frame): + """ + An Alternative Service frame was received on the stream. This frame + inherits the origin associated with this stream. + """ + self.config.logger.debug( + "Receive Alternative Service frame on stream %r", self + ) + + # If the origin is present, RFC 7838 says we have to ignore it. + if frame.origin: + return [], [] + + events = self.state_machine.process_input( + StreamInputs.RECV_ALTERNATIVE_SERVICE + ) + + # There are lots of situations where we want to ignore the ALTSVC + # frame. If we need to pay attention, we'll have an event and should + # fill it out. + if events: + assert isinstance(events[0], AlternativeServiceAvailable) + events[0].origin = self._authority + events[0].field_value = frame.field + + return [], events + + def reset_stream(self, error_code=0): + """ + Close the stream locally. Reset the stream with an error code. + """ + self.config.logger.debug( + "Local reset %r with error code: %d", self, error_code + ) + self.state_machine.process_input(StreamInputs.SEND_RST_STREAM) + + rsf = RstStreamFrame(self.stream_id) + rsf.error_code = error_code + return [rsf] + + def stream_reset(self, frame): + """ + Handle a stream being reset remotely. + """ + self.config.logger.debug( + "Remote reset %r with error code: %d", self, frame.error_code + ) + events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM) + + if events: + # We don't fire an event if this stream is already closed. + events[0].error_code = _error_code_from_int(frame.error_code) + + return [], events + + def acknowledge_received_data(self, acknowledged_size): + """ + The user has informed us that they've processed some amount of data + that was received on this stream. Pass that to the window manager and + potentially return some WindowUpdate frames. + """ + self.config.logger.debug( + "Acknowledge received data with size %d on %r", + acknowledged_size, self + ) + increment = self._inbound_window_manager.process_bytes( + acknowledged_size + ) + if increment: + f = WindowUpdateFrame(self.stream_id) + f.window_increment = increment + return [f] + + return [] + + def _build_hdr_validation_flags(self, events): + """ + Constructs a set of header validation flags for use when normalizing + and validating header blocks. + """ + is_trailer = isinstance( + events[0], (_TrailersSent, TrailersReceived) + ) + is_response_header = isinstance( + events[0], + ( + _ResponseSent, + ResponseReceived, + InformationalResponseReceived + ) + ) + is_push_promise = isinstance( + events[0], (PushedStreamReceived, _PushedRequestSent) + ) + + return HeaderValidationFlags( + is_client=self.state_machine.client, + is_trailer=is_trailer, + is_response_header=is_response_header, + is_push_promise=is_push_promise, + ) + + def _build_headers_frames(self, + headers, + encoder, + first_frame, + hdr_validation_flags): + """ + Helper method to build headers or push promise frames. + """ + # We need to lowercase the header names, and to ensure that secure + # header fields are kept out of compression contexts. + if self.config.normalize_outbound_headers: + headers = normalize_outbound_headers( + headers, hdr_validation_flags + ) + if self.config.validate_outbound_headers: + headers = validate_outbound_headers( + headers, hdr_validation_flags + ) + + encoded_headers = encoder.encode(headers) + + # Slice into blocks of max_outbound_frame_size. Be careful with this: + # it only works right because we never send padded frames or priority + # information on the frames. Revisit this if we do. + header_blocks = [ + encoded_headers[i:i+self.max_outbound_frame_size] + for i in range( + 0, len(encoded_headers), self.max_outbound_frame_size + ) + ] + + frames = [] + first_frame.data = header_blocks[0] + frames.append(first_frame) + + for block in header_blocks[1:]: + cf = ContinuationFrame(self.stream_id) + cf.data = block + frames.append(cf) + + frames[-1].flags.add('END_HEADERS') + return frames + + def _process_received_headers(self, + headers, + header_validation_flags, + header_encoding): + """ + When headers have been received from the remote peer, run a processing + pipeline on them to transform them into the appropriate form for + attaching to an event. + """ + if self.config.normalize_inbound_headers: + headers = normalize_inbound_headers( + headers, header_validation_flags + ) + + if self.config.validate_inbound_headers: + headers = validate_headers(headers, header_validation_flags) + + if header_encoding: + headers = _decode_headers(headers, header_encoding) + + # The above steps are all generators, so we need to concretize the + # headers now. + return list(headers) + + def _initialize_content_length(self, headers): + """ + Checks the headers for a content-length header and initializes the + _expected_content_length field from it. It's not an error for no + Content-Length header to be present. + """ + if self.request_method == b'HEAD': + self._expected_content_length = 0 + return + + for n, v in headers: + if n == b'content-length': + try: + self._expected_content_length = int(v, 10) + except ValueError: + raise ProtocolError( + "Invalid content-length header: %s" % v + ) + + return + + def _track_content_length(self, length, end_stream): + """ + Update the expected content length in response to data being received. + Validates that the appropriate amount of data is sent. Always updates + the received data, but only validates the length against the + content-length header if one was sent. + + :param length: The length of the body chunk received. + :param end_stream: If this is the last body chunk received. + """ + self._actual_content_length += length + actual = self._actual_content_length + expected = self._expected_content_length + + if expected is not None: + if expected < actual: + raise InvalidBodyLengthError(expected, actual) + + if end_stream and expected != actual: + raise InvalidBodyLengthError(expected, actual) + + def _inbound_flow_control_change_from_settings(self, delta): + """ + We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to + update the target window size for flow control. For our flow control + strategy, this means we need to do two things: we need to adjust the + current window size, but we also need to set the target maximum window + size to the new value. + """ + new_max_size = self._inbound_window_manager.max_window_size + delta + self._inbound_window_manager.window_opened(delta) + self._inbound_window_manager.max_window_size = new_max_size + + +def _decode_headers(headers, encoding): + """ + Given an iterable of header two-tuples and an encoding, decodes those + headers using that encoding while preserving the type of the header tuple. + This ensures that the use of ``HeaderTuple`` is preserved. + """ + for header in headers: + # This function expects to work on decoded headers, which are always + # HeaderTuple objects. + assert isinstance(header, HeaderTuple) + + name, value = header + name = name.decode(encoding) + value = value.decode(encoding) + yield header.__class__(name, value) diff --git a/src/h2/utilities.py b/src/h2/utilities.py new file mode 100644 index 0000000..06c916e --- /dev/null +++ b/src/h2/utilities.py @@ -0,0 +1,660 @@ +# -*- coding: utf-8 -*- +""" +h2/utilities +~~~~~~~~~~~~ + +Utility functions that do not belong in a separate module. +""" +import collections +import re +from string import whitespace +import sys + +from hpack import HeaderTuple, NeverIndexedHeaderTuple + +from .exceptions import ProtocolError, FlowControlError + +UPPER_RE = re.compile(b"[A-Z]") + +# A set of headers that are hop-by-hop or connection-specific and thus +# forbidden in HTTP/2. This list comes from RFC 7540 § 8.1.2.2. +CONNECTION_HEADERS = frozenset([ + b'connection', u'connection', + b'proxy-connection', u'proxy-connection', + b'keep-alive', u'keep-alive', + b'transfer-encoding', u'transfer-encoding', + b'upgrade', u'upgrade', +]) + + +_ALLOWED_PSEUDO_HEADER_FIELDS = frozenset([ + b':method', u':method', + b':scheme', u':scheme', + b':authority', u':authority', + b':path', u':path', + b':status', u':status', + b':protocol', u':protocol', +]) + + +_SECURE_HEADERS = frozenset([ + # May have basic credentials which are vulnerable to dictionary attacks. + b'authorization', u'authorization', + b'proxy-authorization', u'proxy-authorization', +]) + + +_REQUEST_ONLY_HEADERS = frozenset([ + b':scheme', u':scheme', + b':path', u':path', + b':authority', u':authority', + b':method', u':method', + b':protocol', u':protocol', +]) + + +_RESPONSE_ONLY_HEADERS = frozenset([b':status', u':status']) + + +# A Set of pseudo headers that are only valid if the method is +# CONNECT, see RFC 8441 § 5 +_CONNECT_REQUEST_ONLY_HEADERS = frozenset([b':protocol', u':protocol']) + + +if sys.version_info[0] == 2: # Python 2.X + _WHITESPACE = frozenset(whitespace) +else: # Python 3.3+ + _WHITESPACE = frozenset(map(ord, whitespace)) + + +def _secure_headers(headers, hdr_validation_flags): + """ + Certain headers are at risk of being attacked during the header compression + phase, and so need to be kept out of header compression contexts. This + function automatically transforms certain specific headers into HPACK + never-indexed fields to ensure they don't get added to header compression + contexts. + + This function currently implements two rules: + + - 'authorization' and 'proxy-authorization' fields are automatically made + never-indexed. + - Any 'cookie' header field shorter than 20 bytes long is made + never-indexed. + + These fields are the most at-risk. These rules are inspired by Firefox + and nghttp2. + """ + for header in headers: + if header[0] in _SECURE_HEADERS: + yield NeverIndexedHeaderTuple(*header) + elif header[0] in (b'cookie', u'cookie') and len(header[1]) < 20: + yield NeverIndexedHeaderTuple(*header) + else: + yield header + + +def extract_method_header(headers): + """ + Extracts the request method from the headers list. + """ + for k, v in headers: + if k in (b':method', u':method'): + if not isinstance(v, bytes): + return v.encode('utf-8') + else: + return v + + +def is_informational_response(headers): + """ + Searches a header block for a :status header to confirm that a given + collection of headers are an informational response. Assumes the header + block is well formed: that is, that the HTTP/2 special headers are first + in the block, and so that it can stop looking when it finds the first + header field whose name does not begin with a colon. + + :param headers: The HTTP/2 header block. + :returns: A boolean indicating if this is an informational response. + """ + for n, v in headers: + if isinstance(n, bytes): + sigil = b':' + status = b':status' + informational_start = b'1' + else: + sigil = u':' + status = u':status' + informational_start = u'1' + + # If we find a non-special header, we're done here: stop looping. + if not n.startswith(sigil): + return False + + # This isn't the status header, bail. + if n != status: + continue + + # If the first digit is a 1, we've got informational headers. + return v.startswith(informational_start) + + +def guard_increment_window(current, increment): + """ + Increments a flow control window, guarding against that window becoming too + large. + + :param current: The current value of the flow control window. + :param increment: The increment to apply to that window. + :returns: The new value of the window. + :raises: ``FlowControlError`` + """ + # The largest value the flow control window may take. + LARGEST_FLOW_CONTROL_WINDOW = 2**31 - 1 + + new_size = current + increment + + if new_size > LARGEST_FLOW_CONTROL_WINDOW: + raise FlowControlError( + "May not increment flow control window past %d" % + LARGEST_FLOW_CONTROL_WINDOW + ) + + return new_size + + +def authority_from_headers(headers): + """ + Given a header set, searches for the authority header and returns the + value. + + Note that this doesn't terminate early, so should only be called if the + headers are for a client request. Otherwise, will loop over the entire + header set, which is potentially unwise. + + :param headers: The HTTP header set. + :returns: The value of the authority header, or ``None``. + :rtype: ``bytes`` or ``None``. + """ + for n, v in headers: + # This gets run against headers that come both from HPACK and from the + # user, so we may have unicode floating around in here. We only want + # bytes. + if n in (b':authority', u':authority'): + return v.encode('utf-8') if not isinstance(v, bytes) else v + + return None + + +# Flags used by the validate_headers pipeline to determine which checks +# should be applied to a given set of headers. +HeaderValidationFlags = collections.namedtuple( + 'HeaderValidationFlags', + ['is_client', 'is_trailer', 'is_response_header', 'is_push_promise'] +) + + +def validate_headers(headers, hdr_validation_flags): + """ + Validates a header sequence against a set of constraints from RFC 7540. + + :param headers: The HTTP header set. + :param hdr_validation_flags: An instance of HeaderValidationFlags. + """ + # This validation logic is built on a sequence of generators that are + # iterated over to provide the final header list. This reduces some of the + # overhead of doing this checking. However, it's worth noting that this + # checking remains somewhat expensive, and attempts should be made wherever + # possible to reduce the time spent doing them. + # + # For example, we avoid tuple upacking in loops because it represents a + # fixed cost that we don't want to spend, instead indexing into the header + # tuples. + headers = _reject_uppercase_header_fields( + headers, hdr_validation_flags + ) + headers = _reject_surrounding_whitespace( + headers, hdr_validation_flags + ) + headers = _reject_te( + headers, hdr_validation_flags + ) + headers = _reject_connection_header( + headers, hdr_validation_flags + ) + headers = _reject_pseudo_header_fields( + headers, hdr_validation_flags + ) + headers = _check_host_authority_header( + headers, hdr_validation_flags + ) + headers = _check_path_header(headers, hdr_validation_flags) + + return headers + + +def _reject_uppercase_header_fields(headers, hdr_validation_flags): + """ + Raises a ProtocolError if any uppercase character is found in a header + block. + """ + for header in headers: + if UPPER_RE.search(header[0]): + raise ProtocolError( + "Received uppercase header name %s." % header[0]) + yield header + + +def _reject_surrounding_whitespace(headers, hdr_validation_flags): + """ + Raises a ProtocolError if any header name or value is surrounded by + whitespace characters. + """ + # For compatibility with RFC 7230 header fields, we need to allow the field + # value to be an empty string. This is ludicrous, but technically allowed. + # The field name may not be empty, though, so we can safely assume that it + # must have at least one character in it and throw exceptions if it + # doesn't. + for header in headers: + if header[0][0] in _WHITESPACE or header[0][-1] in _WHITESPACE: + raise ProtocolError( + "Received header name surrounded by whitespace %r" % header[0]) + if header[1] and ((header[1][0] in _WHITESPACE) or + (header[1][-1] in _WHITESPACE)): + raise ProtocolError( + "Received header value surrounded by whitespace %r" % header[1] + ) + yield header + + +def _reject_te(headers, hdr_validation_flags): + """ + Raises a ProtocolError if the TE header is present in a header block and + its value is anything other than "trailers". + """ + for header in headers: + if header[0] in (b'te', u'te'): + if header[1].lower() not in (b'trailers', u'trailers'): + raise ProtocolError( + "Invalid value for Transfer-Encoding header: %s" % + header[1] + ) + + yield header + + +def _reject_connection_header(headers, hdr_validation_flags): + """ + Raises a ProtocolError if the Connection header is present in a header + block. + """ + for header in headers: + if header[0] in CONNECTION_HEADERS: + raise ProtocolError( + "Connection-specific header field present: %s." % header[0] + ) + + yield header + + +def _custom_startswith(test_string, bytes_prefix, unicode_prefix): + """ + Given a string that might be a bytestring or a Unicode string, + return True if it starts with the appropriate prefix. + """ + if isinstance(test_string, bytes): + return test_string.startswith(bytes_prefix) + else: + return test_string.startswith(unicode_prefix) + + +def _assert_header_in_set(string_header, bytes_header, header_set): + """ + Given a set of header names, checks whether the string or byte version of + the header name is present. Raises a Protocol error with the appropriate + error if it's missing. + """ + if not (string_header in header_set or bytes_header in header_set): + raise ProtocolError( + "Header block missing mandatory %s header" % string_header + ) + + +def _reject_pseudo_header_fields(headers, hdr_validation_flags): + """ + Raises a ProtocolError if duplicate pseudo-header fields are found in a + header block or if a pseudo-header field appears in a block after an + ordinary header field. + + Raises a ProtocolError if pseudo-header fields are found in trailers. + """ + seen_pseudo_header_fields = set() + seen_regular_header = False + method = None + + for header in headers: + if _custom_startswith(header[0], b':', u':'): + if header[0] in seen_pseudo_header_fields: + raise ProtocolError( + "Received duplicate pseudo-header field %s" % header[0] + ) + + seen_pseudo_header_fields.add(header[0]) + + if seen_regular_header: + raise ProtocolError( + "Received pseudo-header field out of sequence: %s" % + header[0] + ) + + if header[0] not in _ALLOWED_PSEUDO_HEADER_FIELDS: + raise ProtocolError( + "Received custom pseudo-header field %s" % header[0] + ) + + if header[0] in (b':method', u':method'): + if not isinstance(header[1], bytes): + method = header[1].encode('utf-8') + else: + method = header[1] + + else: + seen_regular_header = True + + yield header + + # Check the pseudo-headers we got to confirm they're acceptable. + _check_pseudo_header_field_acceptability( + seen_pseudo_header_fields, method, hdr_validation_flags + ) + + +def _check_pseudo_header_field_acceptability(pseudo_headers, + method, + hdr_validation_flags): + """ + Given the set of pseudo-headers present in a header block and the + validation flags, confirms that RFC 7540 allows them. + """ + # Pseudo-header fields MUST NOT appear in trailers - RFC 7540 § 8.1.2.1 + if hdr_validation_flags.is_trailer and pseudo_headers: + raise ProtocolError( + "Received pseudo-header in trailer %s" % pseudo_headers + ) + + # If ':status' pseudo-header is not there in a response header, reject it. + # Similarly, if ':path', ':method', or ':scheme' are not there in a request + # header, reject it. Additionally, if a response contains any request-only + # headers or vice-versa, reject it. + # Relevant RFC section: RFC 7540 § 8.1.2.4 + # https://tools.ietf.org/html/rfc7540#section-8.1.2.4 + if hdr_validation_flags.is_response_header: + _assert_header_in_set(u':status', b':status', pseudo_headers) + invalid_response_headers = pseudo_headers & _REQUEST_ONLY_HEADERS + if invalid_response_headers: + raise ProtocolError( + "Encountered request-only headers %s" % + invalid_response_headers + ) + elif (not hdr_validation_flags.is_response_header and + not hdr_validation_flags.is_trailer): + # This is a request, so we need to have seen :path, :method, and + # :scheme. + _assert_header_in_set(u':path', b':path', pseudo_headers) + _assert_header_in_set(u':method', b':method', pseudo_headers) + _assert_header_in_set(u':scheme', b':scheme', pseudo_headers) + invalid_request_headers = pseudo_headers & _RESPONSE_ONLY_HEADERS + if invalid_request_headers: + raise ProtocolError( + "Encountered response-only headers %s" % + invalid_request_headers + ) + if method != b'CONNECT': + invalid_headers = pseudo_headers & _CONNECT_REQUEST_ONLY_HEADERS + if invalid_headers: + raise ProtocolError( + "Encountered connect-request-only headers %s" % + invalid_headers + ) + + +def _validate_host_authority_header(headers): + """ + Given the :authority and Host headers from a request block that isn't + a trailer, check that: + 1. At least one of these headers is set. + 2. If both headers are set, they match. + + :param headers: The HTTP header set. + :raises: ``ProtocolError`` + """ + # We use None as a sentinel value. Iterate over the list of headers, + # and record the value of these headers (if present). We don't need + # to worry about receiving duplicate :authority headers, as this is + # enforced by the _reject_pseudo_header_fields() pipeline. + # + # TODO: We should also guard against receiving duplicate Host headers, + # and against sending duplicate headers. + authority_header_val = None + host_header_val = None + + for header in headers: + if header[0] in (b':authority', u':authority'): + authority_header_val = header[1] + elif header[0] in (b'host', u'host'): + host_header_val = header[1] + + yield header + + # If we have not-None values for these variables, then we know we saw + # the corresponding header. + authority_present = (authority_header_val is not None) + host_present = (host_header_val is not None) + + # It is an error for a request header block to contain neither + # an :authority header nor a Host header. + if not authority_present and not host_present: + raise ProtocolError( + "Request header block does not have an :authority or Host header." + ) + + # If we receive both headers, they should definitely match. + if authority_present and host_present: + if authority_header_val != host_header_val: + raise ProtocolError( + "Request header block has mismatched :authority and " + "Host headers: %r / %r" + % (authority_header_val, host_header_val) + ) + + +def _check_host_authority_header(headers, hdr_validation_flags): + """ + Raises a ProtocolError if a header block arrives that does not contain an + :authority or a Host header, or if a header block contains both fields, + but their values do not match. + """ + # We only expect to see :authority and Host headers on request header + # blocks that aren't trailers, so skip this validation if this is a + # response header or we're looking at trailer blocks. + skip_validation = ( + hdr_validation_flags.is_response_header or + hdr_validation_flags.is_trailer + ) + if skip_validation: + return headers + + return _validate_host_authority_header(headers) + + +def _check_path_header(headers, hdr_validation_flags): + """ + Raise a ProtocolError if a header block arrives or is sent that contains an + empty :path header. + """ + def inner(): + for header in headers: + if header[0] in (b':path', u':path'): + if not header[1]: + raise ProtocolError("An empty :path header is forbidden") + + yield header + + # We only expect to see :authority and Host headers on request header + # blocks that aren't trailers, so skip this validation if this is a + # response header or we're looking at trailer blocks. + skip_validation = ( + hdr_validation_flags.is_response_header or + hdr_validation_flags.is_trailer + ) + if skip_validation: + return headers + else: + return inner() + + +def _lowercase_header_names(headers, hdr_validation_flags): + """ + Given an iterable of header two-tuples, rebuilds that iterable with the + header names lowercased. This generator produces tuples that preserve the + original type of the header tuple for tuple and any ``HeaderTuple``. + """ + for header in headers: + if isinstance(header, HeaderTuple): + yield header.__class__(header[0].lower(), header[1]) + else: + yield (header[0].lower(), header[1]) + + +def _strip_surrounding_whitespace(headers, hdr_validation_flags): + """ + Given an iterable of header two-tuples, strip both leading and trailing + whitespace from both header names and header values. This generator + produces tuples that preserve the original type of the header tuple for + tuple and any ``HeaderTuple``. + """ + for header in headers: + if isinstance(header, HeaderTuple): + yield header.__class__(header[0].strip(), header[1].strip()) + else: + yield (header[0].strip(), header[1].strip()) + + +def _strip_connection_headers(headers, hdr_validation_flags): + """ + Strip any connection headers as per RFC7540 § 8.1.2.2. + """ + for header in headers: + if header[0] not in CONNECTION_HEADERS: + yield header + + +def _check_sent_host_authority_header(headers, hdr_validation_flags): + """ + Raises an InvalidHeaderBlockError if we try to send a header block + that does not contain an :authority or a Host header, or if + the header block contains both fields, but their values do not match. + """ + # We only expect to see :authority and Host headers on request header + # blocks that aren't trailers, so skip this validation if this is a + # response header or we're looking at trailer blocks. + skip_validation = ( + hdr_validation_flags.is_response_header or + hdr_validation_flags.is_trailer + ) + if skip_validation: + return headers + + return _validate_host_authority_header(headers) + + +def _combine_cookie_fields(headers, hdr_validation_flags): + """ + RFC 7540 § 8.1.2.5 allows HTTP/2 clients to split the Cookie header field, + which must normally appear only once, into multiple fields for better + compression. However, they MUST be joined back up again when received. + This normalization step applies that transform. The side-effect is that + all cookie fields now appear *last* in the header block. + """ + # There is a problem here about header indexing. Specifically, it's + # possible that all these cookies are sent with different header indexing + # values. At this point it shouldn't matter too much, so we apply our own + # logic and make them never-indexed. + cookies = [] + for header in headers: + if header[0] == b'cookie': + cookies.append(header[1]) + else: + yield header + if cookies: + cookie_val = b'; '.join(cookies) + yield NeverIndexedHeaderTuple(b'cookie', cookie_val) + + +def normalize_outbound_headers(headers, hdr_validation_flags): + """ + Normalizes a header sequence that we are about to send. + + :param headers: The HTTP header set. + :param hdr_validation_flags: An instance of HeaderValidationFlags. + """ + headers = _lowercase_header_names(headers, hdr_validation_flags) + headers = _strip_surrounding_whitespace(headers, hdr_validation_flags) + headers = _strip_connection_headers(headers, hdr_validation_flags) + headers = _secure_headers(headers, hdr_validation_flags) + + return headers + + +def normalize_inbound_headers(headers, hdr_validation_flags): + """ + Normalizes a header sequence that we have received. + + :param headers: The HTTP header set. + :param hdr_validation_flags: An instance of HeaderValidationFlags + """ + headers = _combine_cookie_fields(headers, hdr_validation_flags) + return headers + + +def validate_outbound_headers(headers, hdr_validation_flags): + """ + Validates and normalizes a header sequence that we are about to send. + + :param headers: The HTTP header set. + :param hdr_validation_flags: An instance of HeaderValidationFlags. + """ + headers = _reject_te( + headers, hdr_validation_flags + ) + headers = _reject_connection_header( + headers, hdr_validation_flags + ) + headers = _reject_pseudo_header_fields( + headers, hdr_validation_flags + ) + headers = _check_sent_host_authority_header( + headers, hdr_validation_flags + ) + headers = _check_path_header(headers, hdr_validation_flags) + + return headers + + +class SizeLimitDict(collections.OrderedDict): + + def __init__(self, *args, **kwargs): + self._size_limit = kwargs.pop("size_limit", None) + super(SizeLimitDict, self).__init__(*args, **kwargs) + + self._check_size_limit() + + def __setitem__(self, key, value): + super(SizeLimitDict, self).__setitem__(key, value) + + self._check_size_limit() + + def _check_size_limit(self): + if self._size_limit is not None: + while len(self) > self._size_limit: + self.popitem(last=False) diff --git a/src/h2/windows.py b/src/h2/windows.py new file mode 100644 index 0000000..6656975 --- /dev/null +++ b/src/h2/windows.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +""" +h2/windows +~~~~~~~~~~ + +Defines tools for managing HTTP/2 flow control windows. + +The objects defined in this module are used to automatically manage HTTP/2 +flow control windows. Specifically, they keep track of what the size of the +window is, how much data has been consumed from that window, and how much data +the user has already used. It then implements a basic algorithm that attempts +to manage the flow control window without user input, trying to ensure that it +does not emit too many WINDOW_UPDATE frames. +""" +from __future__ import division + +from .exceptions import FlowControlError + + +# The largest acceptable value for a HTTP/2 flow control window. +LARGEST_FLOW_CONTROL_WINDOW = 2**31 - 1 + + +class WindowManager(object): + """ + A basic HTTP/2 window manager. + + :param max_window_size: The maximum size of the flow control window. + :type max_window_size: ``int`` + """ + def __init__(self, max_window_size): + assert max_window_size <= LARGEST_FLOW_CONTROL_WINDOW + self.max_window_size = max_window_size + self.current_window_size = max_window_size + self._bytes_processed = 0 + + def window_consumed(self, size): + """ + We have received a certain number of bytes from the remote peer. This + necessarily shrinks the flow control window! + + :param size: The number of flow controlled bytes we received from the + remote peer. + :type size: ``int`` + :returns: Nothing. + :rtype: ``None`` + """ + self.current_window_size -= size + if self.current_window_size < 0: + raise FlowControlError("Flow control window shrunk below 0") + + def window_opened(self, size): + """ + The flow control window has been incremented, either because of manual + flow control management or because of the user changing the flow + control settings. This can have the effect of increasing what we + consider to be the "maximum" flow control window size. + + This does not increase our view of how many bytes have been processed, + only of how much space is in the window. + + :param size: The increment to the flow control window we received. + :type size: ``int`` + :returns: Nothing + :rtype: ``None`` + """ + self.current_window_size += size + + if self.current_window_size > LARGEST_FLOW_CONTROL_WINDOW: + raise FlowControlError( + "Flow control window mustn't exceed %d" % + LARGEST_FLOW_CONTROL_WINDOW + ) + + if self.current_window_size > self.max_window_size: + self.max_window_size = self.current_window_size + + def process_bytes(self, size): + """ + The application has informed us that it has processed a certain number + of bytes. This may cause us to want to emit a window update frame. If + we do want to emit a window update frame, this method will return the + number of bytes that we should increment the window by. + + :param size: The number of flow controlled bytes that the application + has processed. + :type size: ``int`` + :returns: The number of bytes to increment the flow control window by, + or ``None``. + :rtype: ``int`` or ``None`` + """ + self._bytes_processed += size + return self._maybe_update_window() + + def _maybe_update_window(self): + """ + Run the algorithm. + + Our current algorithm can be described like this. + + 1. If no bytes have been processed, we immediately return 0. There is + no meaningful way for us to hand space in the window back to the + remote peer, so let's not even try. + 2. If there is no space in the flow control window, and we have + processed at least 1024 bytes (or 1/4 of the window, if the window + is smaller), we will emit a window update frame. This is to avoid + the risk of blocking a stream altogether. + 3. If there is space in the flow control window, and we have processed + at least 1/2 of the window worth of bytes, we will emit a window + update frame. This is to minimise the number of window update frames + we have to emit. + + In a healthy system with large flow control windows, this will + irregularly emit WINDOW_UPDATE frames. This prevents us starving the + connection by emitting eleventy bajillion WINDOW_UPDATE frames, + especially in situations where the remote peer is sending a lot of very + small DATA frames. + """ + # TODO: Can the window be smaller than 1024 bytes? If not, we can + # streamline this algorithm. + if not self._bytes_processed: + return None + + max_increment = (self.max_window_size - self.current_window_size) + increment = 0 + + # Note that, even though we may increment less than _bytes_processed, + # we still want to set it to zero whenever we emit an increment. This + # is because we'll always increment up to the maximum we can. + if (self.current_window_size == 0) and ( + self._bytes_processed > min(1024, self.max_window_size // 4)): + increment = min(self._bytes_processed, max_increment) + self._bytes_processed = 0 + elif self._bytes_processed >= (self.max_window_size // 2): + increment = min(self._bytes_processed, max_increment) + self._bytes_processed = 0 + + self.current_window_size += increment + return increment |