summaryrefslogtreecommitdiff
path: root/synapse/http/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/client.py')
-rw-r--r--synapse/http/client.py34
1 files changed, 28 insertions, 6 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 8743e983..6bc51202 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -31,6 +31,7 @@ from twisted.internet.interfaces import (
IReactorPluggableNameResolver,
IResolutionReceiver,
)
+from twisted.internet.task import Cooperator
from twisted.python.failure import Failure
from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, HTTPConnectionPool, readBody
@@ -69,6 +70,21 @@ def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
return False
+_EPSILON = 0.00000001
+
+
+def _make_scheduler(reactor):
+ """Makes a schedular suitable for a Cooperator using the given reactor.
+
+ (This is effectively just a copy from `twisted.internet.task`)
+ """
+
+ def _scheduler(x):
+ return reactor.callLater(_EPSILON, x)
+
+ return _scheduler
+
+
class IPBlacklistingResolver(object):
"""
A proxy for reactor.nameResolver which only produces non-blacklisted IP
@@ -212,6 +228,10 @@ class SimpleHttpClient(object):
if hs.config.user_agent_suffix:
self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
+ # We use this for our body producers to ensure that they use the correct
+ # reactor.
+ self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor()))
+
self.user_agent = self.user_agent.encode("ascii")
if self._ip_blacklist:
@@ -292,7 +312,9 @@ class SimpleHttpClient(object):
try:
body_producer = None
if data is not None:
- body_producer = QuieterFileBodyProducer(BytesIO(data))
+ body_producer = QuieterFileBodyProducer(
+ BytesIO(data), cooperator=self._cooperator,
+ )
request_deferred = treq.request(
method,
@@ -371,7 +393,7 @@ class SimpleHttpClient(object):
body = yield make_deferred_yieldable(readBody(response))
if 200 <= response.code < 300:
- return json.loads(body)
+ return json.loads(body.decode("utf-8"))
else:
raise HttpResponseException(response.code, response.phrase, body)
@@ -412,7 +434,7 @@ class SimpleHttpClient(object):
body = yield make_deferred_yieldable(readBody(response))
if 200 <= response.code < 300:
- return json.loads(body)
+ return json.loads(body.decode("utf-8"))
else:
raise HttpResponseException(response.code, response.phrase, body)
@@ -441,7 +463,7 @@ class SimpleHttpClient(object):
actual_headers.update(headers)
body = yield self.get_raw(uri, args, headers=headers)
- return json.loads(body)
+ return json.loads(body.decode("utf-8"))
@defer.inlineCallbacks
def put_json(self, uri, json_body, args={}, headers=None):
@@ -485,7 +507,7 @@ class SimpleHttpClient(object):
body = yield make_deferred_yieldable(readBody(response))
if 200 <= response.code < 300:
- return json.loads(body)
+ return json.loads(body.decode("utf-8"))
else:
raise HttpResponseException(response.code, response.phrase, body)
@@ -503,7 +525,7 @@ class SimpleHttpClient(object):
header name to a list of values for that header
Returns:
Deferred: Succeeds when we get *any* 2xx HTTP response, with the
- HTTP body at text.
+ HTTP body as bytes.
Raises:
HttpResponseException on a non-2xx HTTP response.
"""