summaryrefslogtreecommitdiff
path: root/test/testTls.py
diff options
context:
space:
mode:
authorAndrej Shadura <andrew.shadura@collabora.co.uk>2020-04-22 22:42:05 +0200
committerAndrej Shadura <andrew.shadura@collabora.co.uk>2020-04-22 22:42:05 +0200
commitf6fd16282811d1a686b5ae611351b0f8eed3c008 (patch)
treedc37399cc3aefb4e733c3307d8a4fdd5c0438e74 /test/testTls.py
parent4083ce411fb1d311316baa250fb271bfb0b63058 (diff)
New upstream version 2.7
Diffstat (limited to 'test/testTls.py')
-rw-r--r--test/testTls.py379
1 files changed, 190 insertions, 189 deletions
diff --git a/test/testTls.py b/test/testTls.py
index e9ce8ae..891a3e6 100644
--- a/test/testTls.py
+++ b/test/testTls.py
@@ -1,189 +1,190 @@
-"""
-"""
-
-# Created on 2013.08.11
-#
-# Author: Giovanni Cannata
-#
-# Copyright 2013 - 2018 Giovanni Cannata
-#
-# This file is part of ldap3.
-#
-# ldap3 is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as published
-# by the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# ldap3 is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with ldap3 in the COPYING and COPYING.LESSER files.
-# If not, see <http://www.gnu.org/licenses/>.
-
-import unittest
-import ssl
-
-from ldap3 import Server, Connection, ServerPool, Tls, SASL, EXTERNAL, MOCK_ASYNC, MOCK_SYNC
-from test.config import test_server, test_port, test_port_ssl, test_user, test_password, test_authentication, \
- test_strategy, test_lazy_connection, test_get_info, test_server_mode, test_valid_names, \
- test_pooling_strategy, test_pooling_active, test_pooling_exhaust, test_ca_cert_file, \
- test_user_cert_file, test_user_key_file
-
-
-class Test(unittest.TestCase):
- def test_start_tls(self):
- if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
- if isinstance(test_server, (list, tuple)):
- server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
- for host in test_server:
- server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
- else:
- server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE), get_info=test_get_info, mode=test_server_mode)
- connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
- connection.open()
- connection.start_tls()
- self.assertFalse(connection.closed)
- connection.unbind()
- if connection.strategy.pooled:
- connection.strategy.terminate()
-
- def test_open_ssl_with_defaults(self):
- if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
- if isinstance(test_server, (list, tuple)):
- server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
- for host in test_server:
- server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
- else:
- server = Server(host=test_server, port=test_port_ssl, use_ssl=True)
- connection = Connection(server, user=test_user, password=test_password)
- connection.open()
- self.assertFalse(connection.closed)
- connection.unbind()
- if connection.strategy.pooled:
- connection.strategy.terminate()
-
- def test_open_with_tls_before_bind(self):
- if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
- if isinstance(test_server, (list, tuple)):
- server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
- for host in test_server:
- server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
- else:
- server = Server(host=test_server, port=test_port, tls=Tls())
- connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
- connection.open(read_server_info=False)
- connection.start_tls(read_server_info=False)
- connection.bind()
- self.assertTrue(connection.bound)
- connection.unbind()
- if connection.strategy.pooled:
- connection.strategy.terminate()
- self.assertFalse(connection.bound)
-
- def test_open_with_tls_after_bind(self):
- if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
- if isinstance(test_server, (list, tuple)):
- server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
- for host in test_server:
- server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
- else:
- server = Server(host=test_server, port=test_port, tls=Tls())
- connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
- connection.open()
- connection.bind()
- connection.start_tls()
- self.assertTrue(connection.bound)
- connection.unbind()
- if connection.strategy.pooled:
- connection.strategy.terminate()
- self.assertFalse(connection.bound)
-
- # def test_bind_ssl_with_certificate(self):
- # if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
- # tls = Tls(local_private_key_file=test_user_key_file,
- # local_certificate_file=test_user_cert_file,
- # validate=ssl.CERT_REQUIRED,
- # version=ssl.PROTOCOL_TLSv1,
- # ca_certs_file=test_ca_cert_file,
- # valid_names=test_valid_names)
- # if isinstance(test_server, (list, tuple)):
- # server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
- # for host in test_server:
- # server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
- # else:
- # server = Server(host=test_server, port=test_port_ssl, use_ssl=True, tls=tls)
- # connection = Connection(server, auto_bind=False, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication)
- # connection.open()
- # connection.bind()
- # self.assertTrue(connection.bound)
- # connection.unbind()
- # if connection.strategy.pooled:
- # connection.strategy.terminate()
- # self.assertFalse(connection.bound)
-
- # def test_sasl_with_external_certificate(self):
- # if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
- # tls = Tls(local_private_key_file=test_user_key_file, local_certificate_file=test_user_cert_file, validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1, ca_certs_file=test_ca_cert_file, valid_names=test_valid_names)
- # if isinstance(test_server, (list, tuple)):
- # server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
- # for host in test_server:
- # server.add(Server(host=host, port=test_port_ssl, use_ssl=True, tls=tls, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
- # else:
- # server = Server(host=test_server, port=test_port_ssl, use_ssl=True, tls=tls)
- # connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, authentication=SASL, sasl_mechanism=EXTERNAL)
- # connection.open()
- # connection.bind()
- # self.assertTrue(connection.bound)
- # connection.unbind()
- # if connection.strategy.pooled:
- # connection.strategy.terminate()
- # self.assertFalse(connection.bound)
-
- def test_bind_ssl_cert_none(self):
- if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
- tls = Tls(validate=ssl.CERT_NONE)
- if isinstance(test_server, (list, tuple)):
- server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
- for host in test_server:
- server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
- else:
- server = Server(host=test_server, port=test_port_ssl, use_ssl=True, tls=tls)
- connection = Connection(server, auto_bind=False, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication)
- connection.open()
- connection.bind()
- self.assertTrue(connection.bound)
- connection.unbind()
- if connection.strategy.pooled:
- connection.strategy.terminate()
- self.assertFalse(connection.bound)
-
- def test_start_tls_with_cipher(self):
- # ciphers = None
- # ciphers = '!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2'
- ciphers = 'HIGH:!aNULL:!RC4:!DSS'
-
- if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
- if isinstance(test_server, (list, tuple)):
- server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
- for host in test_server:
- server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
- else:
- server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE, ciphers=ciphers), get_info=test_get_info, mode=test_server_mode)
- connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
- connection.open()
- connection.start_tls()
- self.assertFalse(connection.closed)
- # self.assertEqual(connection.socket.cipher(), ciphers)
- connection.unbind()
- if connection.strategy.pooled:
- connection.strategy.terminate()
-
- # def test_hostname_doesnt_match(self):
- # tls_config = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1)
- # server = Server('edir1.hyperv', use_ssl=True, tls=tls_config)
- # conn = Connection(server)
- # conn.open()
- # self.assertTrue(conn.bound)
+"""
+"""
+
+# Created on 2013.08.11
+#
+# Author: Giovanni Cannata
+#
+# Copyright 2013 - 2020 Giovanni Cannata
+#
+# This file is part of ldap3.
+#
+# ldap3 is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# ldap3 is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with ldap3 in the COPYING and COPYING.LESSER files.
+# If not, see <http://www.gnu.org/licenses/>.
+
+import unittest
+import ssl
+
+from ldap3 import Server, Connection, ServerPool, Tls, SASL, EXTERNAL, MOCK_ASYNC, MOCK_SYNC
+from test.config import test_server, test_port, test_port_ssl, test_user, test_password, test_authentication, \
+ test_strategy, test_lazy_connection, test_get_info, test_server_mode, test_valid_names, \
+ test_pooling_strategy, test_pooling_active, test_pooling_exhaust, test_ca_cert_file, \
+ test_user_cert_file, test_user_key_file
+
+
+class Test(unittest.TestCase):
+ def test_start_tls(self):
+ if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
+ if isinstance(test_server, (list, tuple)):
+ server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
+ for host in test_server:
+ server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
+ else:
+ server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE), get_info=test_get_info, mode=test_server_mode)
+ connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
+ connection.open()
+ connection.start_tls()
+ self.assertFalse(connection.closed)
+ connection.unbind()
+ if connection.strategy.pooled:
+ connection.strategy.terminate()
+
+ def test_open_ssl_with_defaults(self):
+ if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
+ if isinstance(test_server, (list, tuple)):
+ server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
+ for host in test_server:
+ server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
+ else:
+ server = Server(host=test_server, port=test_port_ssl, use_ssl=True)
+ connection = Connection(server, user=test_user, password=test_password)
+ connection.open()
+ self.assertFalse(connection.closed)
+ connection.unbind()
+ if connection.strategy.pooled:
+ connection.strategy.terminate()
+
+ def test_open_with_tls_before_bind(self):
+ if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
+ if isinstance(test_server, (list, tuple)):
+ server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
+ for host in test_server:
+ server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
+ else:
+ server = Server(host=test_server, port=test_port, tls=Tls())
+ connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
+ connection.open(read_server_info=False)
+ connection.start_tls(read_server_info=False)
+ connection.bind()
+ self.assertTrue(connection.bound)
+ connection.unbind()
+ if connection.strategy.pooled:
+ connection.strategy.terminate()
+ self.assertFalse(connection.bound)
+
+ def test_open_with_tls_after_bind(self):
+ if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
+ if isinstance(test_server, (list, tuple)):
+ server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
+ for host in test_server:
+ server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
+ else:
+ server = Server(host=test_server, port=test_port, tls=Tls())
+ connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
+ connection.open()
+ connection.bind()
+ connection.start_tls()
+ self.assertTrue(connection.bound)
+ connection.unbind()
+ if connection.strategy.pooled:
+ connection.strategy.terminate()
+ self.assertFalse(connection.bound)
+
+ # def test_bind_ssl_with_certificate(self):
+ # if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
+ # tls = Tls(local_private_key_file=test_user_key_file,
+ # local_certificate_file=test_user_cert_file,
+ # validate=ssl.CERT_REQUIRED,
+ # version=ssl.PROTOCOL_SSLv23,
+ # ssl_options=[ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv3],
+ # ca_certs_file=test_ca_cert_file,
+ # valid_names=test_valid_names)
+ # if isinstance(test_server, (list, tuple)):
+ # server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
+ # for host in test_server:
+ # server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
+ # else:
+ # server = Server(host=test_server, port=test_port_ssl, use_ssl=True, tls=tls)
+ # connection = Connection(server, auto_bind=False, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication)
+ # connection.open()
+ # connection.bind()
+ # self.assertTrue(connection.bound)
+ # connection.unbind()
+ # if connection.strategy.pooled:
+ # connection.strategy.terminate()
+ # self.assertFalse(connection.bound)
+
+ # def test_sasl_with_external_certificate(self):
+ # if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
+ # tls = Tls(local_private_key_file=test_user_key_file, local_certificate_file=test_user_cert_file, validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1, ca_certs_file=test_ca_cert_file, valid_names=test_valid_names)
+ # if isinstance(test_server, (list, tuple)):
+ # server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
+ # for host in test_server:
+ # server.add(Server(host=host, port=test_port_ssl, use_ssl=True, tls=tls, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
+ # else:
+ # server = Server(host=test_server, port=test_port_ssl, use_ssl=True, tls=tls)
+ # connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, authentication=SASL, sasl_mechanism=EXTERNAL)
+ # connection.open()
+ # connection.bind()
+ # self.assertTrue(connection.bound)
+ # connection.unbind()
+ # if connection.strategy.pooled:
+ # connection.strategy.terminate()
+ # self.assertFalse(connection.bound)
+
+ def test_bind_ssl_cert_none(self):
+ if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
+ tls = Tls(validate=ssl.CERT_NONE)
+ if isinstance(test_server, (list, tuple)):
+ server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
+ for host in test_server:
+ server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
+ else:
+ server = Server(host=test_server, port=test_port_ssl, use_ssl=True, tls=tls)
+ connection = Connection(server, auto_bind=False, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication)
+ connection.open()
+ connection.bind()
+ self.assertTrue(connection.bound)
+ connection.unbind()
+ if connection.strategy.pooled:
+ connection.strategy.terminate()
+ self.assertFalse(connection.bound)
+
+ def test_start_tls_with_cipher(self):
+ # ciphers = None
+ # ciphers = '!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2'
+ ciphers = 'HIGH:!aNULL:!RC4:!DSS'
+
+ if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
+ if isinstance(test_server, (list, tuple)):
+ server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
+ for host in test_server:
+ server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
+ else:
+ server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE, ciphers=ciphers), get_info=test_get_info, mode=test_server_mode)
+ connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
+ connection.open()
+ connection.start_tls()
+ self.assertFalse(connection.closed)
+ # self.assertEqual(connection.socket.cipher(), ciphers)
+ connection.unbind()
+ if connection.strategy.pooled:
+ connection.strategy.terminate()
+
+ # def test_hostname_doesnt_match(self):
+ # tls_config = Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1)
+ # server = Server('edir1.hyperv', use_ssl=True, tls=tls_config)
+ # conn = Connection(server)
+ # conn.open()
+ # self.assertTrue(conn.bound)