summaryrefslogtreecommitdiff
path: root/ldap3/strategy/asyncStream.py
blob: 7977d7e8b20c6f7f8019cf38766808d39a303954 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
"""

# Created on 2016.07.10
#
# Author: Giovanni Cannata
#
# Copyright 2016 - 2018 Giovanni Cannata
#
# This file is part of ldap3.
#
# ldap3 is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ldap3 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with ldap3 in the COPYING and COPYING.LESSER files.
# If not, see <http://www.gnu.org/licenses/>.

try:
    from queue import Queue
except ImportError:  # Python 2
    # noinspection PyUnresolvedReferences
    from Queue import Queue

from io import StringIO
from os import linesep

from ..protocol.rfc2849 import decode_persistent_search_control
from ..strategy.asynchronous import AsyncStrategy
from ..core.exceptions import LDAPLDIFError
from ..utils.conv import prepare_for_stream
from ..protocol.rfc2849 import persistent_search_response_to_ldif, add_ldif_header


# noinspection PyProtectedMember
class AsyncStreamStrategy(AsyncStrategy):
    """
    This strategy is asynchronous. It streams responses in a generator as they appear in the self._responses container
    """
    def __init__(self, ldap_connection):
        AsyncStrategy.__init__(self, ldap_connection)
        self.can_stream = True
        self.line_separator = linesep
        self.all_base64 = False
        self.stream = None
        self.order = dict()
        self._header_added = False
        self.persistent_search_message_id = None
        self.streaming = False
        self.callback = None
        self.events = Queue()
        del self._requests  # remove _requests dict from Async Strategy

    def _start_listen(self):
        AsyncStrategy._start_listen(self)
        if self.streaming:
            if not self.stream or (isinstance(self.stream, StringIO) and self.stream.closed):
                self.set_stream(StringIO())

    def _stop_listen(self):
        AsyncStrategy._stop_listen(self)
        if self.streaming:
            self.stream.close()

    def accumulate_stream(self, message_id, change):
        if message_id == self.persistent_search_message_id:
            with self.async_lock:
                self._responses[message_id] = []
            if self.streaming:
                if not self._header_added and self.stream.tell() == 0:
                    header = add_ldif_header(['-'])[0]
                    self.stream.write(prepare_for_stream(header + self.line_separator + self.line_separator))

                ldif_lines = persistent_search_response_to_ldif(change)
                if self.stream and ldif_lines and not self.connection.closed:
                    fragment = self.line_separator.join(ldif_lines)
                    if not self._header_added and self.stream.tell() == 0:
                        self._header_added = True
                        header = add_ldif_header(['-'])[0]
                        self.stream.write(prepare_for_stream(header + self.line_separator + self.line_separator))
                    self.stream.write(prepare_for_stream(fragment + self.line_separator + self.line_separator))
            else:  # strategy is not streaming, events are added to a queue
                notification = decode_persistent_search_control(change)
                if notification:
                    change.update(notification)
                    del change['controls']['2.16.840.1.113730.3.4.7']
                if not self.callback:
                    self.events.put(change)
                else:
                    self.callback(change)

    def get_stream(self):
        if self.streaming:
            return self.stream
        return None

    def set_stream(self, value):
        error = False
        try:
            if not value.writable():
                error = True
        except (ValueError, AttributeError):
            error = True

        if error:
            raise LDAPLDIFError('stream must be writable')

        self.stream = value
        self.streaming = True