summaryrefslogtreecommitdiff
path: root/searx/poolrequests.py
blob: f9a9d771992bc025b3d62dd677d777034b92ed39 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import requests

from itertools import cycle
from threading import RLock, local
from searx import settings
from time import time


class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):

    def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
                 pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
                 max_retries=requests.adapters.DEFAULT_RETRIES,
                 pool_block=requests.adapters.DEFAULT_POOLBLOCK,
                 **conn_params):
        if max_retries == requests.adapters.DEFAULT_RETRIES:
            self.max_retries = requests.adapters.Retry(0, read=False)
        else:
            self.max_retries = requests.adapters.Retry.from_int(max_retries)
        self.config = {}
        self.proxy_manager = {}

        super(requests.adapters.HTTPAdapter, self).__init__()

        self._pool_connections = pool_connections
        self._pool_maxsize = pool_maxsize
        self._pool_block = pool_block
        self._conn_params = conn_params

        self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)

    def __setstate__(self, state):
        # Can't handle by adding 'proxy_manager' to self.__attrs__ because
        # because self.poolmanager uses a lambda function, which isn't pickleable.
        self.proxy_manager = {}
        self.config = {}

        for attr, value in state.items():
            setattr(self, attr, value)

        self.init_poolmanager(self._pool_connections, self._pool_maxsize,
                              block=self._pool_block, **self._conn_params)


threadLocal = local()
connect = settings['outgoing'].get('pool_connections', 100)  # Magic number kept from previous code
maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE)  # Picked from constructor
if settings['outgoing'].get('source_ips'):
    http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
                                                    source_address=(source_ip, 0))
                          for source_ip in settings['outgoing']['source_ips'])
    https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
                                                     source_address=(source_ip, 0))
                           for source_ip in settings['outgoing']['source_ips'])
else:
    http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
    https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))


class SessionSinglePool(requests.Session):

    def __init__(self):
        super(SessionSinglePool, self).__init__()

        # reuse the same adapters
        with RLock():
            self.adapters.clear()
            self.mount('https://', next(https_adapters))
            self.mount('http://', next(http_adapters))

    def close(self):
        """Call super, but clear adapters since there are managed globaly"""
        self.adapters.clear()
        super(SessionSinglePool, self).close()


def set_timeout_for_thread(timeout, start_time=None):
    threadLocal.timeout = timeout
    threadLocal.start_time = start_time


def reset_time_for_thread():
    threadLocal.total_time = 0


def get_time_for_thread():
    return threadLocal.total_time


def request(method, url, **kwargs):
    """same as requests/requests/api.py request(...)"""
    time_before_request = time()

    # session start
    session = SessionSinglePool()

    # proxies
    kwargs['proxies'] = settings['outgoing'].get('proxies') or None

    # timeout
    if 'timeout' in kwargs:
        timeout = kwargs['timeout']
    else:
        timeout = getattr(threadLocal, 'timeout', None)
        if timeout is not None:
            kwargs['timeout'] = timeout

    # do request
    response = session.request(method=method, url=url, **kwargs)

    time_after_request = time()

    # is there a timeout for this engine ?
    if timeout is not None:
        timeout_overhead = 0.2  # seconds
        # start_time = when the user request started
        start_time = getattr(threadLocal, 'start_time', time_before_request)
        search_duration = time_after_request - start_time
        if search_duration > timeout + timeout_overhead:
            raise requests.exceptions.Timeout(response=response)

    # session end
    session.close()

    if hasattr(threadLocal, 'total_time'):
        threadLocal.total_time += time_after_request - time_before_request

    return response


def get(url, **kwargs):
    kwargs.setdefault('allow_redirects', True)
    return request('get', url, **kwargs)


def options(url, **kwargs):
    kwargs.setdefault('allow_redirects', True)
    return request('options', url, **kwargs)


def head(url, **kwargs):
    kwargs.setdefault('allow_redirects', False)
    return request('head', url, **kwargs)


def post(url, data=None, **kwargs):
    return request('post', url, data=data, **kwargs)


def put(url, data=None, **kwargs):
    return request('put', url, data=data, **kwargs)


def patch(url, data=None, **kwargs):
    return request('patch', url, data=data, **kwargs)


def delete(url, **kwargs):
    return request('delete', url, **kwargs)