summaryrefslogtreecommitdiff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-04-27 13:09:25 +0100
committerErik Johnston <erik@matrix.org>2018-04-27 13:09:25 +0100
commit2422eaf5800ef3fb668fae055bd0ce5378590a7d (patch)
tree009b391f594c1b23cb2b267d6cd77a40c32b0be7 /synapse
parent6cd1cb036e45c300c0231947cbc488d4816e7968 (diff)
New upstream version 0.28.0
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py10
-rw-r--r--synapse/api/errors.py3
-rw-r--r--synapse/app/frontend_proxy.py2
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/appservice/__init__.py4
-rw-r--r--synapse/appservice/api.py11
-rw-r--r--synapse/config/_base.py16
-rw-r--r--synapse/config/appservice.py8
-rw-r--r--synapse/crypto/keyring.py8
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/federation_server.py57
-rw-r--r--synapse/federation/send_queue.py14
-rw-r--r--synapse/federation/transaction_queue.py73
-rw-r--r--synapse/federation/transport/server.py12
-rw-r--r--synapse/handlers/appservice.py37
-rw-r--r--synapse/handlers/federation.py109
-rw-r--r--synapse/handlers/message.py180
-rw-r--r--synapse/handlers/register.py36
-rw-r--r--synapse/handlers/room_list.py43
-rw-r--r--synapse/handlers/room_member.py21
-rw-r--r--synapse/handlers/sync.py25
-rw-r--r--synapse/http/endpoint.py103
-rw-r--r--synapse/http/server.py4
-rw-r--r--synapse/http/site.py6
-rw-r--r--synapse/metrics/__init__.py43
-rw-r--r--synapse/metrics/metric.py32
-rw-r--r--synapse/notifier.py1
-rw-r--r--synapse/replication/http/send_event.py20
-rw-r--r--synapse/rest/client/v1/logout.py5
-rw-r--r--synapse/rest/client/v1/register.py4
-rw-r--r--synapse/rest/client/v1/room.py7
-rw-r--r--synapse/rest/client/v2_alpha/register.py32
-rw-r--r--synapse/rest/media/v1/media_storage.py4
-rw-r--r--synapse/storage/__init__.py4
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/engines/__init__.py5
-rw-r--r--synapse/storage/event_federation.py63
-rw-r--r--synapse/storage/events.py52
-rw-r--r--synapse/storage/events_worker.py20
-rw-r--r--synapse/storage/room.py6
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/types.py2
-rw-r--r--synapse/util/caches/response_cache.py106
-rw-r--r--synapse/util/file_consumer.py4
-rw-r--r--synapse/util/logcontext.py1
46 files changed, 778 insertions, 427 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 0c1c16b9..4924f44d 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.27.4"
+__version__ = "0.28.0"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index ac0a3655..f17fda63 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -204,8 +204,8 @@ class Auth(object):
ip_addr = self.hs.get_ip_from_request(request)
user_agent = request.requestHeaders.getRawHeaders(
- "User-Agent",
- default=[""]
+ b"User-Agent",
+ default=[b""]
)[0]
if user and access_token and ip_addr:
self.store.insert_client_ip(
@@ -672,7 +672,7 @@ def has_access_token(request):
bool: False if no access_token was given, True otherwise.
"""
query_params = request.args.get("access_token")
- auth_headers = request.requestHeaders.getRawHeaders("Authorization")
+ auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
return bool(query_params) or bool(auth_headers)
@@ -692,8 +692,8 @@ def get_access_token_from_request(request, token_not_found_http_status=401):
AuthError: If there isn't an access_token in the request.
"""
- auth_headers = request.requestHeaders.getRawHeaders("Authorization")
- query_params = request.args.get("access_token")
+ auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
+ query_params = request.args.get(b"access_token")
if auth_headers:
# Try the get the access_token from a "Authorization: Bearer"
# header
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index bee59e80..a9ff5576 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -18,6 +18,7 @@
import logging
import simplejson as json
+from six import iteritems
logger = logging.getLogger(__name__)
@@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
A dict representing the error response JSON.
"""
err = {"error": msg, "errcode": code}
- for key, value in kwargs.iteritems():
+ for key, value in iteritems(kwargs):
err[key] = value
return err
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index de889357..b349e3e3 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -90,7 +90,7 @@ class KeyUploadServlet(RestServlet):
# They're actually trying to upload something, proxy to main synapse.
# Pass through the auth headers, if any, in case the access token
# is there.
- auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+ auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
headers = {
"Authorization": auth_headers,
}
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 508b6661..2fddcd93 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -58,6 +58,8 @@ from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
from twisted.web.resource import NoResource
+from six import iteritems
+
logger = logging.getLogger("synapse.app.synchrotron")
@@ -211,7 +213,7 @@ class SynchrotronPresence(object):
def get_currently_syncing_users(self):
return [
- user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+ user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index d5a7a5ce..5fdb5797 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -21,6 +21,8 @@ from twisted.internet import defer
import logging
import re
+from six import string_types
+
logger = logging.getLogger(__name__)
@@ -146,7 +148,7 @@ class ApplicationService(object):
)
regex = regex_obj.get("regex")
- if isinstance(regex, basestring):
+ if isinstance(regex, string_types):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
raise ValueError(
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 40c433d7..00efff14 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.http.client import SimpleHttpClient
from synapse.events.utils import serialize_event
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID
@@ -73,7 +72,8 @@ class ApplicationServiceApi(SimpleHttpClient):
super(ApplicationServiceApi, self).__init__(hs)
self.clock = hs.get_clock()
- self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
+ self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
+ timeout_ms=HOUR_IN_MS)
@defer.inlineCallbacks
def query_user(self, service, user_id):
@@ -193,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
defer.returnValue(None)
key = (service.id, protocol)
- result = self.protocol_meta_cache.get(key)
- if not result:
- result = self.protocol_meta_cache.set(
- key, preserve_fn(_get)()
- )
- return make_deferred_yieldable(result)
+ return self.protocol_meta_cache.wrap(key, _get)
@defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None):
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index fa105bce..32b439d2 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -19,6 +19,8 @@ import os
import yaml
from textwrap import dedent
+from six import integer_types
+
class ConfigError(Exception):
pass
@@ -49,7 +51,7 @@ Missing mandatory `server_name` config option.
class Config(object):
@staticmethod
def parse_size(value):
- if isinstance(value, int) or isinstance(value, long):
+ if isinstance(value, integer_types):
return value
sizes = {"K": 1024, "M": 1024 * 1024}
size = 1
@@ -61,7 +63,7 @@ class Config(object):
@staticmethod
def parse_duration(value):
- if isinstance(value, int) or isinstance(value, long):
+ if isinstance(value, integer_types):
return value
second = 1000
minute = 60 * second
@@ -288,22 +290,22 @@ class Config(object):
)
obj.invoke_all("generate_files", config)
config_file.write(config_bytes)
- print (
+ print((
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"
" certificates. Please review this file and customise it"
" to your needs."
- ) % (config_path, server_name)
- print (
+ ) % (config_path, server_name))
+ print(
"If this server name is incorrect, you will need to"
" regenerate the SSL certificates"
)
return
else:
- print (
+ print((
"Config file %r already exists. Generating any missing key"
" files."
- ) % (config_path,)
+ ) % (config_path,))
generate_keys = True
parser = argparse.ArgumentParser(
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index aba0aec6..9a2359b6 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -21,6 +21,8 @@ import urllib
import yaml
import logging
+from six import string_types
+
logger = logging.getLogger(__name__)
@@ -89,14 +91,14 @@ def _load_appservice(hostname, as_info, config_filename):
"id", "as_token", "hs_token", "sender_localpart"
]
for field in required_string_fields:
- if not isinstance(as_info.get(field), basestring):
+ if not isinstance(as_info.get(field), string_types):
raise KeyError("Required string field: '%s' (%s)" % (
field, config_filename,
))
# 'url' must either be a string or explicitly null, not missing
# to avoid accidentally turning off push for ASes.
- if (not isinstance(as_info.get("url"), basestring) and
+ if (not isinstance(as_info.get("url"), string_types) and
as_info.get("url", "") is not None):
raise KeyError(
"Required string field or explicit null: 'url' (%s)" % (config_filename,)
@@ -128,7 +130,7 @@ def _load_appservice(hostname, as_info, config_filename):
"Expected namespace entry in %s to be an object,"
" but got %s", ns, regex_obj
)
- if not isinstance(regex_obj.get("regex"), basestring):
+ if not isinstance(regex_obj.get("regex"), string_types):
raise ValueError(
"Missing/bad type 'regex' key in %s", regex_obj
)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 35f810b0..fce83d44 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -352,7 +352,7 @@ class Keyring(object):
logger.exception(
"Unable to get key from %r: %s %s",
perspective_name,
- type(e).__name__, str(e.message),
+ type(e).__name__, str(e),
)
defer.returnValue({})
@@ -384,7 +384,7 @@ class Keyring(object):
logger.info(
"Unable to get key %r for %r directly: %s %s",
key_ids, server_name,
- type(e).__name__, str(e.message),
+ type(e).__name__, str(e),
)
if not keys:
@@ -734,7 +734,7 @@ def _handle_key_deferred(verify_request):
except IOError as e:
logger.warn(
"Got IOError when downloading keys for %s: %s %s",
- server_name, type(e).__name__, str(e.message),
+ server_name, type(e).__name__, str(e),
)
raise SynapseError(
502,
@@ -744,7 +744,7 @@ def _handle_key_deferred(verify_request):
except Exception as e:
logger.exception(
"Got Exception when downloading keys for %s: %s %s",
- server_name, type(e).__name__, str(e.message),
+ server_name, type(e).__name__, str(e),
)
raise SynapseError(
401,
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 38440da5..8e2c0c4c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -394,7 +394,7 @@ class FederationClient(FederationBase):
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
else:
- seen_events = yield self.store.have_events(event_ids)
+ seen_events = yield self.store.have_seen_events(event_ids)
signed_events = []
failed_to_fetch = set()
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bea7fd0b..247ddc89 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -30,9 +31,10 @@ import synapse.metrics
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function
+from six import iteritems
+
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -65,7 +67,7 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
- self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
+ self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
@defer.inlineCallbacks
@log_function
@@ -212,16 +214,17 @@ class FederationServer(FederationBase):
if not in_room:
raise AuthError(403, "Host not in room.")
- result = self._state_resp_cache.get((room_id, event_id))
- if not result:
- with (yield self._server_linearizer.queue((origin, room_id))):
- d = self._state_resp_cache.set(
- (room_id, event_id),
- preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
- )
- resp = yield make_deferred_yieldable(d)
- else:
- resp = yield make_deferred_yieldable(result)
+ # we grab the linearizer to protect ourselves from servers which hammer
+ # us. In theory we might already have the response to this query
+ # in the cache so we could return it without waiting for the linearizer
+ # - but that's non-trivial to get right, and anyway somewhat defeats
+ # the point of the linearizer.
+ with (yield self._server_linearizer.queue((origin, room_id))):
+ resp = yield self._state_resp_cache.wrap(
+ (room_id, event_id),
+ self._on_context_state_request_compute,
+ room_id, event_id,
+ )
defer.returnValue((200, resp))
@@ -425,9 +428,9 @@ class FederationServer(FederationBase):
"Claimed one-time-keys: %s",
",".join((
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in json_result.iteritems()
- for device_id, device_keys in user_keys.iteritems()
- for key_id, _ in device_keys.iteritems()
+ for user_id, user_keys in iteritems(json_result)
+ for device_id, device_keys in iteritems(user_keys)
+ for key_id, _ in iteritems(device_keys)
)),
)
@@ -494,13 +497,33 @@ class FederationServer(FederationBase):
def _handle_received_pdu(self, origin, pdu):
""" Process a PDU received in a federation /send/ transaction.
+ If the event is invalid, then this method throws a FederationError.
+ (The error will then be logged and sent back to the sender (which
+ probably won't do anything with it), and other events in the
+ transaction will be processed as normal).
+
+ It is likely that we'll then receive other events which refer to
+ this rejected_event in their prev_events, etc. When that happens,
+ we'll attempt to fetch the rejected event again, which will presumably
+ fail, so those second-generation events will also get rejected.
+
+ Eventually, we get to the point where there are more than 10 events
+ between any new events and the original rejected event. Since we
+ only try to backfill 10 events deep on received pdu, we then accept the
+ new event, possibly introducing a discontinuity in the DAG, with new
+ forward extremities, so normal service is approximately returned,
+ until we try to backfill across the discontinuity.
+
Args:
origin (str): server which sent the pdu
pdu (FrozenEvent): received pdu
Returns (Deferred): completes with None
- Raises: FederationError if the signatures / hash do not match
- """
+
+ Raises: FederationError if the signatures / hash do not match, or
+ if the event was unacceptable for any other reason (eg, too large,
+ too many prev_events, couldn't find the prev_events)
+ """
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 93e5aceb..0f0c687b 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -40,6 +40,8 @@ from collections import namedtuple
import logging
+from six import itervalues, iteritems
+
logger = logging.getLogger(__name__)
@@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
- for uids in self.presence_changed.itervalues()
+ for uids in itervalues(self.presence_changed)
for user_id in uids
)
@@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object):
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
- for ((destination, edu_key), pos) in keyed_edus.iteritems():
+ for ((destination, edu_key), pos) in iteritems(keyed_edus):
rows.append((pos, KeyedEduRow(
key=edu_key,
edu=self.keyed_edu[(destination, edu_key)],
@@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object):
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
- for (destination, pos) in device_messages.iteritems():
+ for (destination, pos) in iteritems(device_messages):
rows.append((pos, DeviceRow(
destination=destination,
)))
@@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
- for destination, edu_map in buff.keyed_edus.iteritems():
+ for destination, edu_map in iteritems(buff.keyed_edus):
for key, edu in edu_map.items():
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=key,
)
- for destination, edu_list in buff.edus.iteritems():
+ for destination, edu_list in iteritems(buff.edus):
for edu in edu_list:
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=None,
)
- for destination, failure_list in buff.failures.iteritems():
+ for destination, failure_list in iteritems(buff.failures):
for failure in failure_list:
transaction_queue.send_failure(destination, failure)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a141ec99..963d938e 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -169,7 +169,7 @@ class TransactionQueue(object):
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
- last_token, self._last_poked_id, limit=20,
+ last_token, self._last_poked_id, limit=100,
)
logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,24 +177,33 @@ class TransactionQueue(object):
if not events and next_token >= self._last_poked_id:
break
- for event in events:
+ @defer.inlineCallbacks
+ def handle_event(event):
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
- continue
-
- # Get the state from before the event.
- # We need to make sure that this is the state from before
- # the event and not from after it.
- # Otherwise if the last member on a server in a room is
- # banned then it won't receive the event because it won't
- # be in the room after the ban.
- destinations = yield self.state.get_current_hosts_in_room(
- event.room_id, latest_event_ids=[
- prev_id for prev_id, _ in event.prev_events
- ],
- )
+ return
+
+ try:
+ # Get the state from before the event.
+ # We need to make sure that this is the state from before
+ # the event and not from after it.
+ # Otherwise if the last member on a server in a room is
+ # banned then it won't receive the event because it won't
+ # be in the room after the ban.
+ destinations = yield self.state.get_current_hosts_in_room(
+ event.room_id, latest_event_ids=[
+ prev_id for prev_id, _ in event.prev_events
+ ],
+ )
+ except Exception:
+ logger.exception(
+ "Failed to calculate hosts in room for event: %s",
+ event.event_id,
+ )
+ return
+
destinations = set(destinations)
if send_on_behalf_of is not None:
@@ -207,12 +216,44 @@ class TransactionQueue(object):
self._send_pdu(event, destinations)
- events_processed_counter.inc_by(len(events))
+ @defer.inlineCallbacks
+ def handle_room_events(events):
+ for event in events:
+ yield handle_event(event)
+
+ events_by_room = {}
+ for event in events:
+ events_by_room.setdefault(event.room_id, []).append(event)
+
+ yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(handle_room_events, evs)
+ for evs in events_by_room.itervalues()
+ ],
+ consumeErrors=True
+ ))
yield self.store.update_federation_out_pos(
"events", next_token
)
+ if events:
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(events[-1].event_id)
+
+ synapse.metrics.event_processing_lag.set(
+ now - ts, "federation_sender",
+ )
+ synapse.metrics.event_processing_last_ts.set(
+ ts, "federation_sender",
+ )
+
+ events_processed_counter.inc_by(len(events))
+
+ synapse.metrics.event_processing_positions.set(
+ next_token, "federation_sender",
+ )
+
finally:
self._is_processing = False
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 4c94d5a3..ff0656df 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -94,12 +94,6 @@ class Authenticator(object):
"signatures": {},
}
- if (
- self.federation_domain_whitelist is not None and
- self.server_name not in self.federation_domain_whitelist
- ):
- raise FederationDeniedError(self.server_name)
-
if content is not None:
json_request["content"] = content
@@ -138,6 +132,12 @@ class Authenticator(object):
json_request["origin"] = origin
json_request["signatures"].setdefault(origin, {})[key] = sig
+ if (
+ self.federation_domain_whitelist is not None and
+ origin not in self.federation_domain_whitelist
+ ):
+ raise FederationDeniedError(origin)
+
if not json_request["signatures"]:
raise NoAuthenticationError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3dd3fa2a..0245197c 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import (
+ make_deferred_yieldable, preserve_fn, run_in_background,
+)
import logging
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
if not events:
break
+ events_by_room = {}
for event in events:
+ events_by_room.setdefault(event.room_id, []).append(event)
+
+ @defer.inlineCallbacks
+ def handle_event(event):
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
- continue # no services need notifying
+ return # no services need notifying
# Do we know this user exists? If not, poke the user
# query API for all services which match that user regex.
@@ -108,9 +115,33 @@ class ApplicationServicesHandler(object):
service, event
)
- events_processed_counter.inc_by(len(events))
+ @defer.inlineCallbacks
+ def handle_room_events(events):
+ for event in events:
+ yield handle_event(event)
+
+ yield make_deferred_yieldable(defer.gatherResults([
+ run_in_background(handle_room_events, evs)
+ for evs in events_by_room.itervalues()
+ ], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound)
+
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(events[-1].event_id)
+
+ synapse.metrics.event_processing_positions.set(
+ upper_bound, "appservice_sender",
+ )
+
+ events_processed_counter.inc_by(len(events))
+
+ synapse.metrics.event_processing_lag.set(
+ now - ts, "appservice_sender",
+ )
+ synapse.metrics.event_processing_last_ts.set(
+ ts, "appservice_sender",
+ )
finally:
self.is_processing = False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 080aca3d..ae7e0d6d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -15,8 +15,14 @@
# limitations under the License.
"""Contains handlers for federation events."""
+
+import httplib
+import itertools
+import logging
+
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
+from twisted.internet import defer
from unpaddedbase64 import decode_base64
from ._base import BaseHandler
@@ -43,10 +49,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room
-from twisted.internet import defer
-
-import itertools
-import logging
logger = logging.getLogger(__name__)
@@ -115,6 +117,19 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id)
return
+ # do some initial sanity-checking of the event. In particular, make
+ # sure it doesn't have hundreds of prev_events or auth_events, which
+ # could cause a huge state resolution or cascade of event fetches.
+ try:
+ self._sanity_check_event(pdu)
+ except SynapseError as err:
+ raise FederationError(
+ "ERROR",
+ err.code,
+ err.msg,
+ affected=pdu.event_id,
+ )
+
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
@@ -149,10 +164,6 @@ class FederationHandler(BaseHandler):
auth_chain = []
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
-
fetch_state = False
# Get missing pdus if necessary.
@@ -168,7 +179,7 @@ class FederationHandler(BaseHandler):
)
prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
+ seen = yield self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
@@ -196,8 +207,7 @@ class FederationHandler(BaseHandler):
# Update the set of things we've seen after trying to
# fetch the missing stuff
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.iterkeys())
+ seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
logger.info(
@@ -248,8 +258,7 @@ class FederationHandler(BaseHandler):
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.keys())
+ seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
@@ -361,9 +370,7 @@ class FederationHandler(BaseHandler):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
- seen_ids = set(
- (yield self.store.have_events(event_ids)).keys()
- )
+ seen_ids = yield self.store.have_seen_events(event_ids)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
@@ -527,9 +534,16 @@ class FederationHandler(BaseHandler):
def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
- This will attempt to get more events from the remote. This may return
- be successfull and still return no events if the other side has no new
- events to offer.
+ This will attempt to get more events from the remote. If the other side
+ has no new events to offer, this will return an empty list.
+
+ As the events are received, we check their signatures, and also do some
+ sanity-checking on them. If any of the backfilled events are invalid,
+ this method throws a SynapseError.
+
+ TODO: make this more useful to distinguish failures of the remote
+ server from invalid events (there is probably no point in trying to
+ re-fetch invalid events from every other HS in the room.)
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@@ -541,6 +555,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
+ # ideally we'd sanity check the events here for excess prev_events etc,
+ # but it's hard to reject events at this point without completely
+ # breaking backfill in the same way that it is currently broken by
+ # events whose signature we cannot verify (#3121).
+ #
+ # So for now we accept the events anyway. #3124 tracks this.
+ #
+ # for ev in events:
+ # self._sanity_check_event(ev)
+
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
@@ -633,7 +657,7 @@ class FederationHandler(BaseHandler):
failed_to_fetch = missing_auth - set(auth_events)
- seen_events = yield self.store.have_events(
+ seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)
@@ -843,6 +867,38 @@ class FederationHandler(BaseHandler):
defer.returnValue(False)
+ def _sanity_check_event(self, ev):
+ """
+ Do some early sanity checks of a received event
+
+ In particular, checks it doesn't have an excessive number of
+ prev_events or auth_events, which could cause a huge state resolution
+ or cascade of event fetches.
+
+ Args:
+ ev (synapse.events.EventBase): event to be checked
+
+ Returns: None
+
+ Raises:
+ SynapseError if the event does not pass muster
+ """
+ if len(ev.prev_events) > 20:
+ logger.warn("Rejecting event %s which has %i prev_events",
+ ev.event_id, len(ev.prev_events))
+ raise SynapseError(
+ httplib.BAD_REQUEST,
+ "Too many prev_events",
+ )
+
+ if len(ev.auth_events) > 10:
+ logger.warn("Rejecting event %s which has %i auth_events",
+ ev.event_id, len(ev.auth_events))
+ raise SynapseError(
+ httplib.BAD_REQUEST,
+ "Too many auth_events",
+ )
+
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -1736,7 +1792,8 @@ class FederationHandler(BaseHandler):
event_key = None
if event_auth_events - current_state:
- have_events = yield self.store.have_events(
+ # TODO: can we use store.have_seen_events here instead?
+ have_events = yield self.store.get_seen_events_with_rejections(
event_auth_events - current_state
)
else:
@@ -1759,12 +1816,12 @@ class FederationHandler(BaseHandler):
origin, event.room_id, event.event_id
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
- if e.event_id in seen_remotes.keys():
+ if e.event_id in seen_remotes:
continue
if e.event_id == event.event_id:
@@ -1791,7 +1848,7 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
- have_events = yield self.store.have_events(
+ have_events = yield self.store.get_seen_events_with_rejections(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
@@ -1876,13 +1933,13 @@ class FederationHandler(BaseHandler):
local_auth_chain,
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
)
# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes.keys():
+ if ev.event_id in seen_remotes:
continue
if ev.event_id == event.event_id:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6de6e13b..21628a85 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -37,7 +37,6 @@ from ._base import BaseHandler
from canonicaljson import encode_canonical_json
import logging
-import random
import simplejson
logger = logging.getLogger(__name__)
@@ -433,7 +432,7 @@ class EventCreationHandler(object):
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
- prev_event_ids=None):
+ prev_events_and_hashes=None):
"""
Given a dict from a client, create a new event.
@@ -447,47 +446,52 @@ class EventCreationHandler(object):
event_dict (dict): An entire event
token_id (str)
txn_id (str)
- prev_event_ids (list): The prev event ids to use when creating the event
+
+ prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+ the forward extremities to use as the prev_events for the
+ new event. For each event, a tuple of (event_id, hashes, depth)
+ where *hashes* is a map from algorithm to hash.
+
+ If None, they will be requested from the database.
Returns:
Tuple of created event (FrozenEvent), Context
"""
builder = self.event_builder_factory.new(event_dict)
- with (yield self.limiter.queue(builder.room_id)):
- self.validator.validate_new(builder)
-
- if builder.type == EventTypes.Member:
- membership = builder.content.get("membership", None)
- target = UserID.from_string(builder.state_key)
-
- if membership in {Membership.JOIN, Membership.INVITE}:
- # If event doesn't include a display name, add one.
- profile = self.profile_handler
- content = builder.content
-
- try:
- if "displayname" not in content:
- content["displayname"] = yield profile.get_displayname(target)
- if "avatar_url" not in content:
- content["avatar_url"] = yield profile.get_avatar_url(target)
- except Exception as e:
- logger.info(
- "Failed to get profile information for %r: %s",
- target, e
- )
+ self.validator.validate_new(builder)
+
+ if builder.type == EventTypes.Member:
+ membership = builder.content.get("membership", None)
+ target = UserID.from_string(builder.state_key)
+
+ if membership in {Membership.JOIN, Membership.INVITE}:
+ # If event doesn't include a display name, add one.
+ profile = self.profile_handler
+ content = builder.content
+
+ try:
+ if "displayname" not in content:
+ content["displayname"] = yield profile.get_displayname(target)
+ if "avatar_url" not in content:
+ content["avatar_url"] = yield profile.get_avatar_url(target)
+ except Exception as e:
+ logger.info(
+ "Failed to get profile information for %r: %s",
+ target, e
+ )
- if token_id is not None:
- builder.internal_metadata.token_id = token_id
+ if token_id is not None:
+ builder.internal_metadata.token_id = token_id
- if txn_id is not None:
- builder.internal_metadata.txn_id = txn_id
+ if txn_id is not None:
+ builder.internal_metadata.txn_id = txn_id
- event, context = yield self.create_new_client_event(
- builder=builder,
- requester=requester,
- prev_event_ids=prev_event_ids,
- )
+ event, context = yield self.create_new_client_event(
+ builder=builder,
+ requester=requester,
+ prev_events_and_hashes=prev_events_and_hashes,
+ )
defer.returnValue((event, context))
@@ -557,64 +561,76 @@ class EventCreationHandler(object):
See self.create_event and self.send_nonmember_event.
"""
- event, context = yield self.create_event(
- requester,
- event_dict,
- token_id=requester.access_token_id,
- txn_id=txn_id
- )
- spam_error = self.spam_checker.check_event_for_spam(event)
- if spam_error:
- if not isinstance(spam_error, basestring):
- spam_error = "Spam is not permitted here"
- raise SynapseError(
- 403, spam_error, Codes.FORBIDDEN
+ # We limit the number of concurrent event sends in a room so that we
+ # don't fork the DAG too much. If we don't limit then we can end up in
+ # a situation where event persistence can't keep up, causing
+ # extremities to pile up, which in turn leads to state resolution
+ # taking longer.
+ with (yield self.limiter.queue(event_dict["room_id"])):
+ event, context = yield self.create_event(
+ requester,
+ event_dict,
+ token_id=requester.access_token_id,
+ txn_id=txn_id
)
- yield self.send_nonmember_event(
- requester,
- event,
- context,
- ratelimit=ratelimit,
- )
+ spam_error = self.spam_checker.check_event_for_spam(event)
+ if spam_error:
+ if not isinstance(spam_error, basestring):
+ spam_error = "Spam is not permitted here"
+ raise SynapseError(
+ 403, spam_error, Codes.FORBIDDEN
+ )
+
+ yield self.send_nonmember_event(
+ requester,
+ event,
+ context,
+ ratelimit=ratelimit,
+ )
defer.returnValue(event)
@measure_func("create_new_client_event")
@defer.inlineCallbacks
- def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
- if prev_event_ids:
- prev_events = yield self.store.add_event_hashes(prev_event_ids)
- prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
- depth = prev_max_depth + 1
- else:
- latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
- builder.room_id,
+ def create_new_client_event(self, builder, requester=None,
+ prev_events_and_hashes=None):
+ """Create a new event for a local client
+
+ Args:
+ builder (EventBuilder):
+
+ requester (synapse.types.Requester|None):
+
+ prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
+ the forward extremities to use as the prev_events for the
+ new event. For each event, a tuple of (event_id, hashes, depth)
+ where *hashes* is a map from algorithm to hash.
+
+ If None, they will be requested from the database.
+
+ Returns:
+ Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+ """
+
+ if prev_events_and_hashes is not None:
+ assert len(prev_events_and_hashes) <= 10, \
+ "Attempting to create an event with %i prev_events" % (
+ len(prev_events_and_hashes),
)
+ else:
+ prev_events_and_hashes = \
+ yield self.store.get_prev_events_for_room(builder.room_id)
- # We want to limit the max number of prev events we point to in our
- # new event
- if len(latest_ret) > 10:
- # Sort by reverse depth, so we point to the most recent.
- latest_ret.sort(key=lambda a: -a[2])
- new_latest_ret = latest_ret[:5]
-
- # We also randomly point to some of the older events, to make
- # sure that we don't completely ignore the older events.
- if latest_ret[5:]:
- sample_size = min(5, len(latest_ret[5:]))
- new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
- latest_ret = new_latest_ret
-
- if latest_ret:
- depth = max([d for _, _, d in latest_ret]) + 1
- else:
- depth = 1
+ if prev_events_and_hashes:
+ depth = max([d for _, _, d in prev_events_and_hashes]) + 1
+ else:
+ depth = 1
- prev_events = [
- (event_id, prev_hashes)
- for event_id, prev_hashes, _ in latest_ret
- ]
+ prev_events = [
+ (event_id, prev_hashes)
+ for event_id, prev_hashes, _ in prev_events_and_hashes
+ ]
builder.prev_events = prev_events
builder.depth = depth
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index dd037052..f83c6b3c 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,7 +23,7 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
-from synapse.types import UserID
+from synapse.types import UserID, create_requester, RoomID, RoomAlias
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -205,10 +205,17 @@ class RegistrationHandler(BaseHandler):
token = None
attempts += 1
+ # auto-join the user to any rooms we're supposed to dump them into
+ fake_requester = create_requester(user_id)
+ for r in self.hs.config.auto_join_rooms:
+ try:
+ yield self._join_user_to_room(fake_requester, r)
+ except Exception as e:
+ logger.error("Failed to join new user to %r: %r", r, e)
+
# We used to generate default identicons here, but nowadays
# we want clients to generate their own as part of their branding
# rather than there being consistent matrix-wide ones, so we don't.
-
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@@ -483,3 +490,28 @@ class RegistrationHandler(BaseHandler):
)
defer.returnValue((user_id, access_token))
+
+ @defer.inlineCallbacks
+ def _join_user_to_room(self, requester, room_identifier):
+ room_id = None
+ room_member_handler = self.hs.get_room_member_handler()
+ if RoomID.is_valid(room_identifier):
+ room_id = room_identifier
+ elif RoomAlias.is_valid(room_identifier):
+ room_alias = RoomAlias.from_string(room_identifier)
+ room_id, remote_room_hosts = (
+ yield room_member_handler.lookup_room_alias(room_alias)
+ )
+ room_id = room_id.to_string()
+ else:
+ raise SynapseError(400, "%s was not legal room ID or room alias" % (
+ room_identifier,
+ ))
+
+ yield room_member_handler.update_membership(
+ requester=requester,
+ target=requester.user,
+ room_id=room_id,
+ remote_room_hosts=remote_room_hosts,
+ action="join",
+ )
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 5d81f59b..add3f9b0 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -20,7 +20,6 @@ from ._base import BaseHandler
from synapse.api.constants import (
EventTypes, JoinRules,
)
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -44,8 +43,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
- self.response_cache = ResponseCache(hs)
- self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
+ self.response_cache = ResponseCache(hs, "room_list")
+ self.remote_response_cache = ResponseCache(hs, "remote_room_list",
+ timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
@@ -77,18 +77,11 @@ class RoomListHandler(BaseHandler):
)
key = (limit, since_token, network_tuple)
- result = self.response_cache.get(key)
- if not result:
- logger.info("No cached result, calculating one.")
- result = self.response_cache.set(
- key,
- preserve_fn(self._get_public_room_list)(
- limit, since_token, network_tuple=network_tuple
- )
- )
- else:
- logger.info("Using cached deferred result.")
- return make_deferred_yieldable(result)
+ return self.response_cache.wrap(
+ key,
+ self._get_public_room_list,
+ limit, since_token, network_tuple=network_tuple,
+ )
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
@@ -422,18 +415,14 @@ class RoomListHandler(BaseHandler):
server_name, limit, since_token, include_all_networks,
third_party_instance_id,
)
- result = self.remote_response_cache.get(key)
- if not result:
- result = self.remote_response_cache.set(
- key,
- repl_layer.get_public_rooms(
- server_name, limit=limit, since_token=since_token,
- search_filter=search_filter,
- include_all_networks=include_all_networks,
- third_party_instance_id=third_party_instance_id,
- )
- )
- return result
+ return self.remote_response_cache.wrap(
+ key,
+ repl_layer.get_public_rooms,
+ server_name, limit=limit, since_token=since_token,
+ search_filter=search_filter,
+ include_all_networks=include_all_networks,
+ third_party_instance_id=third_party_instance_id,
+ )
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 9977be88..714583f1 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -149,7 +149,7 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks
def _local_membership_update(
self, requester, target, room_id, membership,
- prev_event_ids,
+ prev_events_and_hashes,
txn_id=None,
ratelimit=True,
content=None,
@@ -175,7 +175,7 @@ class RoomMemberHandler(object):
},
token_id=requester.access_token_id,
txn_id=txn_id,
- prev_event_ids=prev_event_ids,
+ prev_events_and_hashes=prev_events_and_hashes,
)
# Check if this event matches the previous membership event for the user.
@@ -314,7 +314,12 @@ class RoomMemberHandler(object):
403, "Invites have been disabled on this server",
)
- latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+ prev_events_and_hashes = yield self.store.get_prev_events_for_room(
+ room_id,
+ )
+ latest_event_ids = (
+ event_id for (event_id, _, _) in prev_events_and_hashes
+ )
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)
@@ -403,7 +408,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
- prev_event_ids=latest_event_ids,
+ prev_events_and_hashes=prev_events_and_hashes,
content=content,
)
defer.returnValue(res)
@@ -852,6 +857,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
+ # filter ourselves out of remote_room_hosts: do_invite_join ignores it
+ # and if it is the only entry we'd like to return a 404 rather than a
+ # 500.
+
+ remote_room_hosts = [
+ host for host in remote_room_hosts if host != self.hs.hostname
+ ]
+
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0f713ce0..b52e4c2a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,7 +15,7 @@
from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
@@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
+ __bool__ = __nonzero__ # python3
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
+ __bool__ = __nonzero__ # python3
class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
or self.state
or self.account_data
)
+ __bool__ = __nonzero__ # python3
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
def __nonzero__(self):
"""Invited rooms should always be reported to the client"""
return True
+ __bool__ = __nonzero__ # python3
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
def __nonzero__(self):
return bool(self.join or self.invite or self.leave)
+ __bool__ = __nonzero__ # python3
class DeviceLists(collections.namedtuple("DeviceLists", [
@@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
def __nonzero__(self):
return bool(self.changed or self.left)
+ __bool__ = __nonzero__ # python3
class SyncResult(collections.namedtuple("SyncResult", [
@@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.device_lists or
self.groups
)
+ __bool__ = __nonzero__ # python3
class SyncHandler(object):
@@ -169,7 +176,7 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs)
+ self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -180,15 +187,11 @@ class SyncHandler(object):
Returns:
A Deferred SyncResult.
"""
- result = self.response_cache.get(sync_config.request_key)
- if not result:
- result = self.response_cache.set(
- sync_config.request_key,
- preserve_fn(self._wait_for_sync_for_user)(
- sync_config, since_token, timeout, full_state
- )
- )
- return make_deferred_yieldable(result)
+ return self.response_cache.wrap(
+ sync_config.request_key,
+ self._wait_for_sync_for_user,
+ sync_config, since_token, timeout, full_state,
+ )
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 87639b91..00572c28 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import socket
-
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
@@ -33,7 +31,7 @@ SERVER_CACHE = {}
# our record of an individual server which can be tried to reach a destination.
#
-# "host" is actually a dotted-quad or ipv6 address string. Except when there's
+# "host" is the hostname acquired from the SRV record. Except when there's
# no SRV record, in which case it is the original hostname.
_Server = collections.namedtuple(
"_Server", "priority weight host port expires"
@@ -297,20 +295,13 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
payload = answer.payload
- hosts = yield _get_hosts_for_srv_record(
- dns_client, str(payload.target)
- )
-
- for (ip, ttl) in hosts:
- host_ttl = min(answer.ttl, ttl)
-
- servers.append(_Server(
- host=ip,
- port=int(payload.port),
- priority=int(payload.priority),
- weight=int(payload.weight),
- expires=int(clock.time()) + host_ttl,
- ))
+ servers.append(_Server(
+ host=str(payload.target),
+ port=int(payload.port),
+ priority=int(payload.priority),
+ weight=int(payload.weight),
+ expires=int(clock.time()) + answer.ttl,
+ ))
servers.sort()
cache[service_name] = list(servers)
@@ -328,81 +319,3 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
raise e
defer.returnValue(servers)
-
-
-@defer.inlineCallbacks
-def _get_hosts_for_srv_record(dns_client, host):
- """Look up each of the hosts in a SRV record
-
- Args:
- dns_client (twisted.names.dns.IResolver):
- host (basestring): host to look up
-
- Returns:
- Deferred[list[(str, int)]]: a list of (host, ttl) pairs
-
- """
- ip4_servers = []
- ip6_servers = []
-
- def cb(res):
- # lookupAddress and lookupIP6Address return a three-tuple
- # giving the answer, authority, and additional sections of the
- # response.
- #
- # we only care about the answers.
-
- return res[0]
-
- def eb(res, record_type):
- if res.check(DNSNameError):
- return []
- logger.warn("Error looking up %s for %s: %s", record_type, host, res)
- return res
-
- # no logcontexts here, so we can safely fire these off and gatherResults
- d1 = dns_client.lookupAddress(host).addCallbacks(
- cb, eb, errbackArgs=("A", ))
- d2 = dns_client.lookupIPV6Address(host).addCallbacks(
- cb, eb, errbackArgs=("AAAA", ))
- results = yield defer.DeferredList(
- [d1, d2], consumeErrors=True)
-
- # if all of the lookups failed, raise an exception rather than blowing out
- # the cache with an empty result.
- if results and all(s == defer.FAILURE for (s, _) in results):
- defer.returnValue(results[0][1])
-
- for (success, result) in results:
- if success == defer.FAILURE:
- continue
-
- for answer in result:
- if not answer.payload:
- continue
-
- try:
- if answer.type == dns.A:
- ip = answer.payload.dottedQuad()
- ip4_servers.append((ip, answer.ttl))
- elif answer.type == dns.AAAA:
- ip = socket.inet_ntop(
- socket.AF_INET6, answer.payload.address,
- )
- ip6_servers.append((ip, answer.ttl))
- else:
- # the most likely candidate here is a CNAME record.
- # rfc2782 says srvs may not point to aliases.
- logger.warn(
- "Ignoring unexpected DNS record type %s for %s",
- answer.type, host,
- )
- continue
- except Exception as e:
- logger.warn("Ignoring invalid DNS response for %s: %s",
- host, e)
- continue
-
- # keep the ipv4 results before the ipv6 results, mostly to match historical
- # behaviour.
- defer.returnValue(ip4_servers + ip6_servers)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 64e083eb..8d632290 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -329,7 +329,7 @@ class JsonResource(HttpServer, resource.Resource):
register_paths, so will return (possibly via Deferred) either
None, or a tuple of (http code, response body).
"""
- if request.method == "OPTIONS":
+ if request.method == b"OPTIONS":
return _options_handler, {}
# Loop through all the registered callbacks to check if the method
@@ -543,7 +543,7 @@ def finish_request(request):
def _request_user_agent_is_curl(request):
user_agents = request.requestHeaders.getRawHeaders(
- "User-Agent", default=[]
+ b"User-Agent", default=[]
)
for user_agent in user_agents:
if "curl" in user_agent:
diff --git a/synapse/http/site.py b/synapse/http/site.py
index e422c8df..c8b46e1a 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -20,7 +20,7 @@ import logging
import re
import time
-ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
class SynapseRequest(Request):
@@ -43,12 +43,12 @@ class SynapseRequest(Request):
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
- r'\1<redacted>\3',
+ br'\1<redacted>\3',
self.uri
)
def get_user_agent(self):
- return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
+ return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def started_processing(self):
self.site.access_logger.info(
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 50d99d7a..e3b831db 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -17,12 +17,13 @@ import logging
import functools
import time
import gc
+import platform
from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
- MemoryUsageMetric,
+ MemoryUsageMetric, GaugeMetric,
)
from .process_collector import register_process_collector
@@ -30,6 +31,7 @@ from .process_collector import register_process_collector
logger = logging.getLogger(__name__)
+running_on_pypy = platform.python_implementation() == 'PyPy'
all_metrics = []
all_collectors = []
@@ -63,6 +65,13 @@ class Metrics(object):
"""
return self._register(CounterMetric, *args, **kwargs)
+ def register_gauge(self, *args, **kwargs):
+ """
+ Returns:
+ GaugeMetric
+ """
+ return self._register(GaugeMetric, *args, **kwargs)
+
def register_callback(self, *args, **kwargs):
"""
Returns:
@@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+synapse_metrics = get_metrics_for("synapse")
+
+# Used to track where various components have processed in the event stream,
+# e.g. federation sending, appservice sending, etc.
+event_processing_positions = synapse_metrics.register_gauge(
+ "event_processing_positions", labels=["name"],
+)
+
+# Used to track the current max events stream position
+event_persisted_position = synapse_metrics.register_gauge(
+ "event_persisted_position",
+)
+
+# Used to track the received_ts of the last event processed by various
+# components
+event_processing_last_ts = synapse_metrics.register_gauge(
+ "event_processing_last_ts", labels=["name"],
+)
+
+# Used to track the lag processing events. This is the time difference
+# between the last processed event's received_ts and the time it was
+# finished being processed.
+event_processing_lag = synapse_metrics.register_gauge(
+ "event_processing_lag", labels=["name"],
+)
+
def runUntilCurrentTimer(func):
@@ -174,6 +209,9 @@ def runUntilCurrentTimer(func):
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
+ if running_on_pypy:
+ return ret
+
# Check if we need to do a manual GC (since its been disabled), and do
# one if necessary.
threshold = gc.get_threshold()
@@ -206,6 +244,7 @@ try:
# We manually run the GC each reactor tick so that we can get some metrics
# about time spent doing GC,
- gc.disable()
+ if not running_on_pypy:
+ gc.disable()
except AttributeError:
pass
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index ff5aa8c0..89bd47c3 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
- # (if the metric is a scalar, the (single) key is the empty list).
+ # (if the metric is a scalar, the (single) key is the empty tuple).
self.counts = {}
# Scalar metrics are never empty
@@ -145,6 +145,36 @@ class CounterMetric(BaseMetric):
)
+class GaugeMetric(BaseMetric):
+ """A metric that can go up or down
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(GaugeMetric, self).__init__(*args, **kwargs)
+
+ # dict[list[str]]: value for each set of label values. the keys are the
+ # label values, in the same order as the labels in self.labels.
+ #
+ # (if the metric is a scalar, the (single) key is the empty tuple).
+ self.guages = {}
+
+ def set(self, v, *values):
+ if len(values) != self.dimension():
+ raise ValueError(
+ "Expected as many values to inc() as labels (%d)" % (self.dimension())
+ )
+
+ # TODO: should assert that the tag values are all strings
+
+ self.guages[values] = v
+
+ def render(self):
+ return flatten(
+ self._render_for_labels(k, self.guages[k])
+ for k in sorted(self.guages.keys())
+ )
+
+
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the
diff --git a/synapse/notifier.py b/synapse/notifier.py
index ef042681..0e40a4aa 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -144,6 +144,7 @@ class _NotifierUserStream(object):
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
def __nonzero__(self):
return bool(self.events)
+ __bool__ = __nonzero__ # python3
class Notifier(object):
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index bbe2f967..a9baa2c1 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@@ -115,20 +114,15 @@ class ReplicationSendEventRestServlet(RestServlet):
self.clock = hs.get_clock()
# The responses are tiny, so we may as well cache them for a while
- self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
+ self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
def on_PUT(self, request, event_id):
- result = self.response_cache.get(event_id)
- if not result:
- result = self.response_cache.set(
- event_id,
- self._handle_request(request)
- )
- else:
- logger.warn("Returning cached response")
- return make_deferred_yieldable(result)
-
- @preserve_fn
+ return self.response_cache.wrap(
+ event_id,
+ self._handle_request,
+ request
+ )
+
@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"):
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index ca499559..e092158c 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -44,7 +44,10 @@ class LogoutRestServlet(ClientV1RestServlet):
requester = yield self.auth.get_user_by_req(request)
except AuthError:
# this implies the access token has already been deleted.
- pass
+ defer.returnValue((401, {
+ "errcode": "M_UNKNOWN_TOKEN",
+ "error": "Access Token unknown or expired"
+ }))
else:
if requester.device_id is None:
# the acccess token wasn't associated with a device.
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 5c5fa8f7..8a820971 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -348,9 +348,9 @@ class RegisterRestServlet(ClientV1RestServlet):
admin = register_json.get("admin", None)
# Its important to check as we use null bytes as HMAC field separators
- if "\x00" in user:
+ if b"\x00" in user:
raise SynapseError(400, "Invalid user")
- if "\x00" in password:
+ if b"\x00" in password:
raise SynapseError(400, "Invalid password")
# str() because otherwise hmac complains that 'unicode' does not
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index d06cbdc3..2ad0e594 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
content=content,
)
else:
- event, context = yield self.event_creation_hander.create_event(
+ event = yield self.event_creation_hander.create_and_send_nonmember_event(
requester,
event_dict,
- token_id=requester.access_token_id,
txn_id=txn_id,
)
- yield self.event_creation_hander.send_nonmember_event(
- requester, event, context,
- )
-
ret = {}
if event:
ret = {"event_id": event.event_id}
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 0ba62bdd..f317c919 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -20,7 +20,6 @@ import synapse
import synapse.types
from synapse.api.auth import get_access_token_from_request, has_access_token
from synapse.api.constants import LoginType
-from synapse.types import RoomID, RoomAlias
from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
from synapse.http.servlet import (
RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
@@ -405,14 +404,6 @@ class RegisterRestServlet(RestServlet):
generate_token=False,
)
- # auto-join the user to any rooms we're supposed to dump them into
- fake_requester = synapse.types.create_requester(registered_user_id)
- for r in self.hs.config.auto_join_rooms:
- try:
- yield self._join_user_to_room(fake_requester, r)
- except Exception as e:
- logger.error("Failed to join new user to %r: %r", r, e)
-
# remember that we've now registered that user account, and with
# what user ID (since the user may not have specified)
self.auth_handler.set_session_data(
@@ -446,29 +437,6 @@ class RegisterRestServlet(RestServlet):
return 200, {}
@defer.inlineCallbacks
- def _join_user_to_room(self, requester, room_identifier):
- room_id = None
- if RoomID.is_valid(room_identifier):
- room_id = room_identifier
- elif RoomAlias.is_valid(room_identifier):
- room_alias = RoomAlias.from_string(room_identifier)
- room_id, remote_room_hosts = (
- yield self.room_member_handler.lookup_room_alias(room_alias)
- )
- room_id = room_id.to_string()
- else:
- raise SynapseError(400, "%s was not legal room ID or room alias" % (
- room_identifier,
- ))
-
- yield self.room_member_handler.update_membership(
- requester=requester,
- target=requester.user,
- room_id=room_id,
- action="join",
- )
-
- @defer.inlineCallbacks
def _do_appservice_registration(self, username, as_token, body):
user_id = yield self.registration_handler.appservice_register(
username, as_token
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 83471b31..7f263db2 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -16,6 +16,8 @@
from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender
+import six
+
from ._base import Responder
from synapse.util.file_consumer import BackgroundFileConsumer
@@ -119,7 +121,7 @@ class MediaStorage(object):
os.remove(fname)
except Exception:
pass
- raise t, v, tb
+ six.reraise(t, v, tb)
if not finished_called:
raise Exception("Finished callback not called")
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index eacd49d6..8cdfd50f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -266,9 +266,9 @@ class DataStore(RoomMemberStore, RoomStore,
def count_r30_users(self):
"""
Counts the number of 30 day retained users, defined as:-
- * Users who have created their accounts more than 30 days
+ * Users who have created their accounts more than 30 days ago
* Where last seen at most 30 days ago
- * Where account creation and last_seen are > 30 days
+ * Where account creation and last_seen are > 30 days apart
Returns counts globaly for a given user as well as breaking
by platform
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2fbebd49..2262776a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -376,7 +376,7 @@ class SQLBaseStore(object):
Returns:
A list of dicts where the key is the column header.
"""
- col_headers = list(intern(column[0]) for column in cursor.description)
+ col_headers = list(intern(str(column[0])) for column in cursor.description)
results = list(
dict(zip(col_headers, row)) for row in cursor
)
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 338b4956..8c868ece 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -18,6 +18,7 @@ from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
import importlib
+import platform
SUPPORTED_MODULE = {
@@ -31,6 +32,10 @@ def create_engine(database_config):
engine_class = SUPPORTED_MODULE.get(name, None)
if engine_class:
+ # pypy requires psycopg2cffi rather than psycopg2
+ if (name == "psycopg2" and
+ platform.python_implementation() == "PyPy"):
+ name = "psycopg2cffi"
module = importlib.import_module(name)
return engine_class(module, database_config)
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 00ee82d3..8fbf7ffb 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import random
from twisted.internet import defer
@@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
import logging
-from Queue import PriorityQueue, Empty
+from six.moves.queue import PriorityQueue, Empty
+
+from six.moves import range
logger = logging.getLogger(__name__)
@@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
front_list = list(front)
chunks = [
front_list[x:x + 100]
- for x in xrange(0, len(front), 100)
+ for x in range(0, len(front), 100)
]
for chunk in chunks:
txn.execute(
@@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
retcol="event_id",
)
+ @defer.inlineCallbacks
+ def get_prev_events_for_room(self, room_id):
+ """
+ Gets a subset of the current forward extremities in the given room.
+
+ Limits the result to 10 extremities, so that we can avoid creating
+ events which refer to hundreds of prev_events.
+
+ Args:
+ room_id (str): room_id
+
+ Returns:
+ Deferred[list[(str, dict[str, str], int)]]
+ for each event, a tuple of (event_id, hashes, depth)
+ where *hashes* is a map from algorithm to hash.
+ """
+ res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
+ if len(res) > 10:
+ # Sort by reverse depth, so we point to the most recent.
+ res.sort(key=lambda a: -a[2])
+
+ # we use half of the limit for the actual most recent events, and
+ # the other half to randomly point to some of the older events, to
+ # make sure that we don't completely ignore the older events.
+ res = res[0:5] + random.sample(res[5:], 5)
+
+ defer.returnValue(res)
+
def get_latest_event_ids_and_hashes_in_room(self, room_id):
+ """
+ Gets the current forward extremities in the given room
+
+ Args:
+ room_id (str): room_id
+
+ Returns:
+ Deferred[list[(str, dict[str, str], int)]]
+ for each event, a tuple of (event_id, hashes, depth)
+ where *hashes* is a map from algorithm to hash.
+ """
+
return self.runInteraction(
"get_latest_event_ids_and_hashes_in_room",
self._get_latest_event_ids_and_hashes_in_room,
@@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
room_id,
)
- @defer.inlineCallbacks
- def get_max_depth_of_events(self, event_ids):
- sql = (
- "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
- ) % (",".join(["?"] * len(event_ids)),)
-
- rows = yield self._execute(
- "get_max_depth_of_events", None,
- sql, *event_ids
- )
-
- if rows:
- defer.returnValue(rows[0][0])
- else:
- defer.returnValue(1)
-
def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
txn,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ece5e6c4..5fe4a0e5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -16,6 +16,7 @@
from collections import OrderedDict, deque, namedtuple
from functools import wraps
+import itertools
import logging
import simplejson as json
@@ -444,6 +445,9 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
+ synapse.metrics.event_persisted_position.set(
+ chunk[-1][0].internal_metadata.stream_ordering,
+ )
for event, context in chunk:
if context.app_service:
origin_type = "local"
@@ -1317,13 +1321,49 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(set(r["event_id"] for r in rows))
- def have_events(self, event_ids):
+ @defer.inlineCallbacks
+ def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
+ Args:
+ event_ids (iterable[str]):
+
+ Returns:
+ Deferred[set[str]]: The events we have already seen.
+ """
+ results = set()
+
+ def have_seen_events_txn(txn, chunk):
+ sql = (
+ "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+ % (",".join("?" * len(chunk)), )
+ )
+ txn.execute(sql, chunk)
+ for (event_id, ) in txn:
+ results.add(event_id)
+
+ # break the input up into chunks of 100
+ input_iterator = iter(event_ids)
+ for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+ []):
+ yield self.runInteraction(
+ "have_seen_events",
+ have_seen_events_txn,
+ chunk,
+ )
+ defer.returnValue(results)
+
+ def get_seen_events_with_rejections(self, event_ids):
+ """Given a list of event ids, check if we rejected them.
+
+ Args:
+ event_ids (list[str])
+
Returns:
- dict: Has an entry for each event id we already have seen. Maps to
- the rejected reason string if we rejected the event, else maps to
- None.
+ Deferred[dict[str, str|None):
+ Has an entry for each event id we already have seen. Maps to
+ the rejected reason string if we rejected the event, else maps
+ to None.
"""
if not event_ids:
return defer.succeed({})
@@ -1345,9 +1385,7 @@ class EventsStore(EventsWorkerStore):
return res
- return self.runInteraction(
- "have_events", f,
- )
+ return self.runInteraction("get_rejection_reasons", f)
@defer.inlineCallbacks
def count_daily_messages(self):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 2e23dd78..a937b9bc 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
class EventsWorkerStore(SQLBaseStore):
+ def get_received_ts(self, event_id):
+ """Get received_ts (when it was persisted) for the event.
+
+ Raises an exception for unknown events.
+
+ Args:
+ event_id (str)
+
+ Returns:
+ Deferred[int|None]: Timestamp in milliseconds, or None for events
+ that were persisted before received_ts was implemented.
+ """
+ return self._simple_select_one_onecol(
+ table="events",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="received_ts",
+ desc="get_received_ts",
+ )
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 740c0369..ea6a1891 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
# Convert the IDs to MXC URIs
for media_id in local_mxcs:
- local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+ local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
for hostname, media_id in remote_mxcs:
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
@@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
while next_token:
sql = """
SELECT stream_ordering, json FROM events
- JOIN event_json USING (event_id)
+ JOIN event_json USING (room_id, event_id)
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
@@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
- if hostname == self.hostname:
+ if hostname == self.hs.hostname:
local_media_mxcs.append(media_id)
else:
remote_media_mxcs.append((hostname, media_id))
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 426cbe6e..6ba3e598 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
sql = (
"SELECT stream_ordering, event_id, room_id, type, json, "
" origin_server_ts FROM events"
- " JOIN event_json USING (event_id)"
+ " JOIN event_json USING (room_id, event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
diff --git a/synapse/types.py b/synapse/types.py
index 7cb24cec..cc7c182a 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -169,7 +169,7 @@ class DomainSpecificString(
except Exception:
return False
- __str__ = to_string
+ __repr__ = to_string
class UserID(DomainSpecificString):
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 00af5398..7f79333e 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,8 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
+from twisted.internet import defer
from synapse.util.async import ObservableDeferred
+from synapse.util.caches import metrics as cache_metrics
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
+logger = logging.getLogger(__name__)
class ResponseCache(object):
@@ -24,20 +31,68 @@ class ResponseCache(object):
used rather than trying to compute a new response.
"""
- def __init__(self, hs, timeout_ms=0):
+ def __init__(self, hs, name, timeout_ms=0):
self.pending_result_cache = {} # Requests that haven't finished yet.
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.
+ self._name = name
+ self._metrics = cache_metrics.register_cache(
+ "response_cache",
+ size_callback=lambda: self.size(),
+ cache_name=name,
+ )
+
+ def size(self):
+ return len(self.pending_result_cache)
+
def get(self, key):
+ """Look up the given key.
+
+ Can return either a new Deferred (which also doesn't follow the synapse
+ logcontext rules), or, if the request has completed, the actual
+ result. You will probably want to make_deferred_yieldable the result.
+
+ If there is no entry for the key, returns None. It is worth noting that
+ this means there is no way to distinguish a completed result of None
+ from an absent cache entry.
+
+ Args:
+ key (hashable):
+
+ Returns:
+ twisted.internet.defer.Deferred|None|E: None if there is no entry
+ for this key; otherwise either a deferred result or the result
+ itself.
+ """
result = self.pending_result_cache.get(key)
if result is not None:
+ self._metrics.inc_hits()
return result.observe()
else:
+ self._metrics.inc_misses()
return None
def set(self, key, deferred):
+ """Set the entry for the given key to the given deferred.
+
+ *deferred* should run its callbacks in the sentinel logcontext (ie,
+ you should wrap normal synapse deferreds with
+ logcontext.run_in_background).
+
+ Can return either a new Deferred (which also doesn't follow the synapse
+ logcontext rules), or, if *deferred* was already complete, the actual
+ result. You will probably want to make_deferred_yieldable the result.
+
+ Args:
+ key (hashable):
+ deferred (twisted.internet.defer.Deferred[T):
+
+ Returns:
+ twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
+ result.
+ """
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result
@@ -53,3 +108,52 @@ class ResponseCache(object):
result.addBoth(remove)
return result.observe()
+
+ def wrap(self, key, callback, *args, **kwargs):
+ """Wrap together a *get* and *set* call, taking care of logcontexts
+
+ First looks up the key in the cache, and if it is present makes it
+ follow the synapse logcontext rules and returns it.
+
+ Otherwise, makes a call to *callback(*args, **kwargs)*, which should
+ follow the synapse logcontext rules, and adds the result to the cache.
+
+ Example usage:
+
+ @defer.inlineCallbacks
+ def handle_request(request):
+ # etc
+ defer.returnValue(result)
+
+ result = yield response_cache.wrap(
+ key,
+ handle_request,
+ request,
+ )
+
+ Args:
+ key (hashable): key to get/set in the cache
+
+ callback (callable): function to call if the key is not found in
+ the cache
+
+ *args: positional parameters to pass to the callback, if it is used
+
+ **kwargs: named paramters to pass to the callback, if it is used
+
+ Returns:
+ twisted.internet.defer.Deferred: yieldable result
+ """
+ result = self.get(key)
+ if not result:
+ logger.info("[%s]: no cached result for [%s], calculating new one",
+ self._name, key)
+ d = run_in_background(callback, *args, **kwargs)
+ result = self.set(key, d)
+ elif not isinstance(result, defer.Deferred) or result.called:
+ logger.info("[%s]: using completed cached result for [%s]",
+ self._name, key)
+ else:
+ logger.info("[%s]: using incomplete cached result for [%s]",
+ self._name, key)
+ return make_deferred_yieldable(result)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 90a2608d..3c8a1653 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -17,7 +17,7 @@ from twisted.internet import threads, reactor
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
-import Queue
+from six.moves import queue
class BackgroundFileConsumer(object):
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
# Queue of slices of bytes to be written. When producer calls
# unregister a final None is sent.
- self._bytes_queue = Queue.Queue()
+ self._bytes_queue = queue.Queue()
# Deferred that is resolved when finished writing
self._finished_deferred = None
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index d660ec78..d59adc23 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -92,6 +92,7 @@ class LoggingContext(object):
def __nonzero__(self):
return False
+ __bool__ = __nonzero__ # python3
sentinel = Sentinel()