summaryrefslogtreecommitdiff
path: root/test/testTls.py
blob: c503d506100fce6af3eb62518fad0decf2ac3980 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Created on 2013.08.11
#
# @author: Giovanni Cannata
#
# Copyright 2015 Giovanni Cannata
#
# This file is part of ldap3.
#
# ldap3 is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ldap3 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with ldap3 in the COPYING and COPYING.LESSER files.
# If not, see <http://www.gnu.org/licenses/>.

import unittest
import ssl

from ldap3 import Server, Connection, ServerPool, Tls, SASL
from test import test_server, test_port, test_port_ssl, test_user, test_password, test_authentication, \
    test_strategy, test_lazy_connection, test_get_info, test_server_mode, \
    test_pooling_strategy, test_pooling_active, test_pooling_exhaust, test_ca_cert_file, test_user_cert_file, test_user_key_file


class Test(unittest.TestCase):
    def test_start_tls(self):
        if isinstance(test_server, (list, tuple)):
            server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
            for host in test_server:
                server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
        else:
            server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE), get_info=test_get_info, mode=test_server_mode)
        connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
        connection.open()
        connection.start_tls()
        self.assertFalse(connection.closed)
        connection.unbind()
        if connection.strategy.pooled:
            connection.strategy.terminate()

    def test_open_ssl_with_defaults(self):
        if isinstance(test_server, (list, tuple)):
            server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
            for host in test_server:
                server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
        else:
            server = Server(host=test_server, port=test_port_ssl, use_ssl=True)
        connection = Connection(server, user=test_user, password=test_password)
        connection.open()
        self.assertFalse(connection.closed)
        connection.unbind()
        if connection.strategy.pooled:
            connection.strategy.terminate()

    def test_open_with_tls_before_bind(self):
        if isinstance(test_server, (list, tuple)):
            server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
            for host in test_server:
                server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
        else:
            server = Server(host=test_server, port=test_port, tls=Tls())
        connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
        connection.open()
        connection.start_tls()
        connection.bind()
        self.assertTrue(connection.bound)
        connection.unbind()
        if connection.strategy.pooled:
            connection.strategy.terminate()
        self.assertFalse(connection.bound)

    def test_open_with_tls_after_bind(self):
        if isinstance(test_server, (list, tuple)):
            server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
            for host in test_server:
                server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
        else:
            server = Server(host=test_server, port=test_port, tls=Tls())
        connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
        connection.open()
        connection.bind()
        connection.start_tls()
        self.assertTrue(connection.bound)
        connection.unbind()
        if connection.strategy.pooled:
            connection.strategy.terminate()
        self.assertFalse(connection.bound)

    def test_bind_ssl_with_certificate(self):
        tls = Tls(local_private_key_file=test_user_key_file, local_certificate_file=test_user_cert_file, validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1, ca_certs_file=test_ca_cert_file, valid_names=['EDIR-TEST', 'WIN1.FOREST.LAB', 'sles11sp3-template.hyperv'])
        if isinstance(test_server, (list, tuple)):
            server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
            for host in test_server:
                server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
        else:
            server = Server(host=test_server, port=test_port_ssl, use_ssl=True, tls=tls)
        connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication)
        connection.open()
        connection.bind()
        self.assertTrue(connection.bound)
        connection.unbind()
        if connection.strategy.pooled:
            connection.strategy.terminate()
        self.assertFalse(connection.bound)

    def test_sasl_with_external_certificate(self):
        tls = Tls(local_private_key_file=test_user_key_file, local_certificate_file=test_user_cert_file, validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1, ca_certs_file=test_ca_cert_file, valid_names=['EDIR-TEST', '2.hyperv', 'labldap02.cloudapp.net', 'WIN1.FOREST.LAB'])
        if isinstance(test_server, (list, tuple)):
            server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
            for host in test_server:
                server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
        else:
            server = Server(host=test_server, port=test_port_ssl, use_ssl=True, tls=tls)
        connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, authentication=SASL, sasl_mechanism='EXTERNAL')
        connection.open()
        connection.bind()
        self.assertTrue(connection.bound)
        connection.unbind()
        if connection.strategy.pooled:
            connection.strategy.terminate()
        self.assertFalse(connection.bound)