summaryrefslogtreecommitdiff
path: root/synapse/crypto/context_factory.py
blob: 14b21796d9abd55bf56099d5ef10c0580f4bb7e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from service_identity import VerificationError
from service_identity.pyopenssl import verify_hostname, verify_ip_address
from zope.interface import implementer

from OpenSSL import SSL, crypto
from twisted.internet._sslverify import _defaultCurveName
from twisted.internet.abstract import isIPAddress, isIPv6Address
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from twisted.internet.ssl import (
    CertificateOptions,
    ContextFactory,
    TLSVersion,
    platformTrust,
)
from twisted.python.failure import Failure
from twisted.web.iweb import IPolicyForHTTPS

logger = logging.getLogger(__name__)


_TLS_VERSION_MAP = {
    "1": TLSVersion.TLSv1_0,
    "1.1": TLSVersion.TLSv1_1,
    "1.2": TLSVersion.TLSv1_2,
    "1.3": TLSVersion.TLSv1_3,
}


class ServerContextFactory(ContextFactory):
    """Factory for PyOpenSSL SSL contexts that are used to handle incoming
    connections.

    TODO: replace this with an implementation of IOpenSSLServerConnectionCreator,
    per https://github.com/matrix-org/synapse/issues/1691
    """

    def __init__(self, config):
        # TODO: once pyOpenSSL exposes TLS_METHOD and SSL_CTX_set_min_proto_version,
        # switch to those (see https://github.com/pyca/cryptography/issues/5379).
        #
        # note that, despite the confusing name, SSLv23_METHOD does *not* enforce SSLv2
        # or v3, but is a synonym for TLS_METHOD, which allows the client and server
        # to negotiate an appropriate version of TLS constrained by the version options
        # set with context.set_options.
        #
        self._context = SSL.Context(SSL.SSLv23_METHOD)
        self.configure_context(self._context, config)

    @staticmethod
    def configure_context(context, config):
        try:
            _ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
            context.set_tmp_ecdh(_ecCurve)
        except Exception:
            logger.exception("Failed to enable elliptic curve for TLS")

        context.set_options(
            SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1
        )
        context.use_certificate_chain_file(config.tls_certificate_file)
        context.use_privatekey(config.tls_private_key)

        # https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
        context.set_cipher_list(
            "ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES256:ECDH+AES128:!aNULL:!SHA1:!AESCCM"
        )

    def getContext(self):
        return self._context


@implementer(IPolicyForHTTPS)
class FederationPolicyForHTTPS:
    """Factory for Twisted SSLClientConnectionCreators that are used to make connections
    to remote servers for federation.

    Uses one of two OpenSSL context objects for all connections, depending on whether
    we should do SSL certificate verification.

    get_options decides whether we should do SSL certificate verification and
    constructs an SSLClientConnectionCreator factory accordingly.
    """

    def __init__(self, config):
        self._config = config

        # Check if we're using a custom list of a CA certificates
        trust_root = config.federation_ca_trust_root
        if trust_root is None:
            # Use CA root certs provided by OpenSSL
            trust_root = platformTrust()

        # "insecurelyLowerMinimumTo" is the argument that will go lower than
        # Twisted's default, which is why it is marked as "insecure" (since
        # Twisted's defaults are reasonably secure). But, since Twisted is
        # moving to TLS 1.2 by default, we want to respect the config option if
        # it is set to 1.0 (which the alternate option, raiseMinimumTo, will not
        # let us do).
        minTLS = _TLS_VERSION_MAP[config.federation_client_minimum_tls_version]

        _verify_ssl = CertificateOptions(
            trustRoot=trust_root, insecurelyLowerMinimumTo=minTLS
        )
        self._verify_ssl_context = _verify_ssl.getContext()
        self._verify_ssl_context.set_info_callback(_context_info_cb)

        _no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
        self._no_verify_ssl_context = _no_verify_ssl.getContext()
        self._no_verify_ssl_context.set_info_callback(_context_info_cb)

        self._should_verify = self._config.federation_verify_certificates

        self._federation_certificate_verification_whitelist = (
            self._config.federation_certificate_verification_whitelist
        )

    def get_options(self, host: bytes):
        # IPolicyForHTTPS.get_options takes bytes, but we want to compare
        # against the str whitelist. The hostnames in the whitelist are already
        # IDNA-encoded like the hosts will be here.
        ascii_host = host.decode("ascii")

        # Check if certificate verification has been enabled
        should_verify = self._should_verify

        # Check if we've disabled certificate verification for this host
        if self._should_verify:
            for regex in self._federation_certificate_verification_whitelist:
                if regex.match(ascii_host):
                    should_verify = False
                    break

        ssl_context = (
            self._verify_ssl_context if should_verify else self._no_verify_ssl_context
        )

        return SSLClientConnectionCreator(host, ssl_context, should_verify)

    def creatorForNetloc(self, hostname, port):
        """Implements the IPolicyForHTTPS interface so that this can be passed
        directly to agents.
        """
        return self.get_options(hostname)


@implementer(IPolicyForHTTPS)
class RegularPolicyForHTTPS:
    """Factory for Twisted SSLClientConnectionCreators that are used to make connections
    to remote servers, for other than federation.

    Always uses the same OpenSSL context object, which uses the default OpenSSL CA
    trust root.
    """

    def __init__(self):
        trust_root = platformTrust()
        self._ssl_context = CertificateOptions(trustRoot=trust_root).getContext()
        self._ssl_context.set_info_callback(_context_info_cb)

    def creatorForNetloc(self, hostname, port):
        return SSLClientConnectionCreator(hostname, self._ssl_context, True)


def _context_info_cb(ssl_connection, where, ret):
    """The 'information callback' for our openssl context objects.

    Note: Once this is set as the info callback on a Context object, the Context should
    only be used with the SSLClientConnectionCreator.
    """
    # we assume that the app_data on the connection object has been set to
    # a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
    tls_protocol = ssl_connection.get_app_data()
    try:
        # ... we further assume that SSLClientConnectionCreator has set the
        # '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
        tls_protocol._synapse_tls_verifier.verify_context_info_cb(ssl_connection, where)
    except:  # noqa: E722, taken from the twisted implementation
        logger.exception("Error during info_callback")
        f = Failure()
        tls_protocol.failVerification(f)


@implementer(IOpenSSLClientConnectionCreator)
class SSLClientConnectionCreator:
    """Creates openssl connection objects for client connections.

    Replaces twisted.internet.ssl.ClientTLSOptions
    """

    def __init__(self, hostname: bytes, ctx, verify_certs: bool):
        self._ctx = ctx
        self._verifier = ConnectionVerifier(hostname, verify_certs)

    def clientConnectionForTLS(self, tls_protocol):
        context = self._ctx
        connection = SSL.Connection(context, None)

        # as per twisted.internet.ssl.ClientTLSOptions, we set the application
        # data to our TLSMemoryBIOProtocol...
        connection.set_app_data(tls_protocol)

        # ... and we also gut-wrench a '_synapse_tls_verifier' attribute into the
        # tls_protocol so that the SSL context's info callback has something to
        # call to do the cert verification.
        setattr(tls_protocol, "_synapse_tls_verifier", self._verifier)
        return connection


class ConnectionVerifier:
    """Set the SNI, and do cert verification

    This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by
    the ssl context's info callback.
    """

    # This code is based on twisted.internet.ssl.ClientTLSOptions.

    def __init__(self, hostname: bytes, verify_certs: bool):
        self._verify_certs = verify_certs

        _decoded = hostname.decode("ascii")
        if isIPAddress(_decoded) or isIPv6Address(_decoded):
            self._is_ip_address = True
        else:
            self._is_ip_address = False

        self._hostnameBytes = hostname
        self._hostnameASCII = self._hostnameBytes.decode("ascii")

    def verify_context_info_cb(self, ssl_connection, where):
        if where & SSL.SSL_CB_HANDSHAKE_START and not self._is_ip_address:
            ssl_connection.set_tlsext_host_name(self._hostnameBytes)

        if where & SSL.SSL_CB_HANDSHAKE_DONE and self._verify_certs:
            try:
                if self._is_ip_address:
                    verify_ip_address(ssl_connection, self._hostnameASCII)
                else:
                    verify_hostname(ssl_connection, self._hostnameASCII)
            except VerificationError:
                f = Failure()
                tls_protocol = ssl_connection.get_app_data()
                tls_protocol.failVerification(f)