summaryrefslogtreecommitdiff
path: root/synapse
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2020-11-19 09:37:58 +0100
committerAndrej Shadura <andrewsh@debian.org>2020-11-19 09:37:58 +0100
commit7d2fb3b3aa80b8b396c133e43c8b7a736c8b0cc8 (patch)
treefc5f302434f093a9d5f1fa7cb826ee3b6b567651 /synapse
parentbb2febeb6aaa2b7a4150c47b2dc2b318435ccaff (diff)
New upstream version 1.23.0
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py117
-rw-r--r--synapse/app/_base.py5
-rw-r--r--synapse/appservice/__init__.py4
-rw-r--r--synapse/config/cas.py46
-rw-r--r--synapse/config/jwt_config.py2
-rw-r--r--synapse/config/logger.py98
-rw-r--r--synapse/config/oidc_config.py7
-rw-r--r--synapse/config/registration.py2
-rw-r--r--synapse/config/room_directory.py2
-rw-r--r--synapse/config/saml2_config.py103
-rw-r--r--synapse/config/tracer.py2
-rw-r--r--synapse/crypto/context_factory.py2
-rw-r--r--synapse/events/__init__.py6
-rw-r--r--synapse/events/utils.py2
-rw-r--r--synapse/events/validator.py27
-rw-r--r--synapse/federation/transport/server.py2
-rw-r--r--synapse/groups/attestations.py2
-rw-r--r--synapse/groups/groups_server.py4
-rw-r--r--synapse/handlers/account_validity.py29
-rw-r--r--synapse/handlers/admin.py4
-rw-r--r--synapse/handlers/appservice.py121
-rw-r--r--synapse/handlers/auth.py50
-rw-r--r--synapse/handlers/cas_handler.py4
-rw-r--r--synapse/handlers/e2e_keys.py27
-rw-r--r--synapse/handlers/federation.py14
-rw-r--r--synapse/handlers/groups_local.py9
-rw-r--r--synapse/handlers/message.py40
-rw-r--r--synapse/handlers/oidc_handler.py10
-rw-r--r--synapse/handlers/presence.py16
-rw-r--r--synapse/handlers/profile.py21
-rw-r--r--synapse/handlers/register.py7
-rw-r--r--synapse/handlers/room.py31
-rw-r--r--synapse/handlers/room_member.py8
-rw-r--r--synapse/handlers/saml_handler.py4
-rw-r--r--synapse/handlers/search.py2
-rw-r--r--synapse/handlers/state_deltas.py2
-rw-r--r--synapse/handlers/sync.py4
-rw-r--r--synapse/handlers/typing.py23
-rw-r--r--synapse/handlers/user_directory.py2
-rw-r--r--synapse/http/client.py2
-rw-r--r--synapse/http/federation/well_known_resolver.py2
-rw-r--r--synapse/http/matrixfederationclient.py16
-rw-r--r--synapse/http/request_metrics.py2
-rw-r--r--synapse/http/server.py6
-rw-r--r--synapse/http/servlet.py3
-rw-r--r--synapse/http/site.py50
-rw-r--r--synapse/logging/__init__.py20
-rw-r--r--synapse/logging/_remote.py122
-rw-r--r--synapse/logging/_structured.py329
-rw-r--r--synapse/logging/_terse_json.py192
-rw-r--r--synapse/logging/filter.py33
-rw-r--r--synapse/logging/opentracing.py10
-rw-r--r--synapse/metrics/__init__.py10
-rw-r--r--synapse/metrics/background_process_metrics.py14
-rw-r--r--synapse/notifier.py104
-rw-r--r--synapse/push/baserules.py26
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py41
-rw-r--r--synapse/push/mailer.py41
-rw-r--r--synapse/push/pusherpool.py18
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/replication/http/membership.py6
-rw-r--r--synapse/replication/http/send_event.py3
-rw-r--r--synapse/replication/tcp/client.py20
-rw-r--r--synapse/replication/tcp/redis.py4
-rw-r--r--synapse/replication/tcp/resource.py10
-rw-r--r--synapse/replication/tcp/streams/events.py21
-rw-r--r--synapse/rest/admin/__init__.py12
-rw-r--r--synapse/rest/admin/devices.py2
-rw-r--r--synapse/rest/admin/event_reports.py46
-rw-r--r--synapse/rest/admin/media.py81
-rw-r--r--synapse/rest/admin/statistics.py122
-rw-r--r--synapse/rest/admin/users.py126
-rw-r--r--synapse/rest/client/v1/events.py3
-rw-r--r--synapse/rest/client/v1/login.py3
-rw-r--r--synapse/rest/client/v1/logout.py6
-rw-r--r--synapse/rest/client/v1/presence.py3
-rw-r--r--synapse/rest/client/v1/profile.py6
-rw-r--r--synapse/rest/client/v1/push_rule.py3
-rw-r--r--synapse/rest/client/v1/pusher.py9
-rw-r--r--synapse/rest/client/v1/room.py17
-rw-r--r--synapse/rest/client/v1/voip.py3
-rw-r--r--synapse/rest/client/v2_alpha/account.py16
-rw-r--r--synapse/rest/client/v2_alpha/auth.py3
-rw-r--r--synapse/rest/client/v2_alpha/register.py12
-rw-r--r--synapse/rest/media/v1/filepath.py17
-rw-r--r--synapse/rest/media/v1/media_repository.py237
-rw-r--r--synapse/rest/media/v1/media_storage.py30
-rw-r--r--synapse/server_notices/consent_server_notices.py2
-rw-r--r--synapse/server_notices/server_notices_manager.py2
-rw-r--r--synapse/state/__init__.py2
-rw-r--r--synapse/state/v1.py2
-rw-r--r--synapse/state/v2.py2
-rw-r--r--synapse/static/client/login/js/login.js2
-rw-r--r--synapse/storage/database.py9
-rw-r--r--synapse/storage/databases/main/__init__.py1
-rw-r--r--synapse/storage/databases/main/appservice.py96
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py18
-rw-r--r--synapse/storage/databases/main/event_federation.py82
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py7
-rw-r--r--synapse/storage/databases/main/events_worker.py62
-rw-r--r--synapse/storage/databases/main/media_repository.py131
-rw-r--r--synapse/storage/databases/main/profile.py4
-rw-r--r--synapse/storage/databases/main/registration.py202
-rw-r--r--synapse/storage/databases/main/room.py104
-rw-r--r--synapse/storage/databases/main/schema/delta/58/22puppet_token.sql17
-rw-r--r--synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql2
-rw-r--r--synapse/storage/databases/main/schema/delta/58/23e2e_cross_signing_keys_idx.sql17
-rw-r--r--synapse/storage/databases/main/stats.py127
-rw-r--r--synapse/types.py33
-rw-r--r--synapse/util/caches/descriptors.py235
-rw-r--r--synapse/util/retryutils.py2
112 files changed, 2577 insertions, 1313 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 3e1df2b0..65c1f5aa 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -48,7 +48,7 @@ try:
except ImportError:
pass
-__version__ = "1.22.1"
+__version__ = "1.23.0"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index bff87fab..bfcaf68b 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.logging import opentracing as opentracing
+from synapse.storage.databases.main.registration import TokenLookupResult
from synapse.types import StateMap, UserID
from synapse.util.caches.lrucache import LruCache
from synapse.util.metrics import Measure
@@ -184,18 +185,12 @@ class Auth:
"""
try:
ip_addr = self.hs.get_ip_from_request(request)
- user_agent = request.requestHeaders.getRawHeaders(
- b"User-Agent", default=[b""]
- )[0].decode("ascii", "surrogateescape")
+ user_agent = request.get_user_agent("")
access_token = self.get_access_token_from_request(request)
user_id, app_service = await self._get_appservice_user_id(request)
if user_id:
- request.authenticated_entity = user_id
- opentracing.set_tag("authenticated_entity", user_id)
- opentracing.set_tag("appservice_id", app_service.id)
-
if ip_addr and self._track_appservice_user_ips:
await self.store.insert_client_ip(
user_id=user_id,
@@ -205,31 +200,38 @@ class Auth:
device_id="dummy-device", # stubbed
)
- return synapse.types.create_requester(user_id, app_service=app_service)
+ requester = synapse.types.create_requester(
+ user_id, app_service=app_service
+ )
+
+ request.requester = user_id
+ opentracing.set_tag("authenticated_entity", user_id)
+ opentracing.set_tag("user_id", user_id)
+ opentracing.set_tag("appservice_id", app_service.id)
+
+ return requester
user_info = await self.get_user_by_access_token(
access_token, rights, allow_expired=allow_expired
)
- user = user_info["user"]
- token_id = user_info["token_id"]
- is_guest = user_info["is_guest"]
- shadow_banned = user_info["shadow_banned"]
+ token_id = user_info.token_id
+ is_guest = user_info.is_guest
+ shadow_banned = user_info.shadow_banned
# Deny the request if the user account has expired.
if self._account_validity.enabled and not allow_expired:
- user_id = user.to_string()
- if await self.store.is_account_expired(user_id, self.clock.time_msec()):
+ if await self.store.is_account_expired(
+ user_info.user_id, self.clock.time_msec()
+ ):
raise AuthError(
403, "User account has expired", errcode=Codes.EXPIRED_ACCOUNT
)
- # device_id may not be present if get_user_by_access_token has been
- # stubbed out.
- device_id = user_info.get("device_id")
+ device_id = user_info.device_id
- if user and access_token and ip_addr:
+ if access_token and ip_addr:
await self.store.insert_client_ip(
- user_id=user.to_string(),
+ user_id=user_info.token_owner,
access_token=access_token,
ip=ip_addr,
user_agent=user_agent,
@@ -243,19 +245,23 @@ class Auth:
errcode=Codes.GUEST_ACCESS_FORBIDDEN,
)
- request.authenticated_entity = user.to_string()
- opentracing.set_tag("authenticated_entity", user.to_string())
- if device_id:
- opentracing.set_tag("device_id", device_id)
-
- return synapse.types.create_requester(
- user,
+ requester = synapse.types.create_requester(
+ user_info.user_id,
token_id,
is_guest,
shadow_banned,
device_id,
app_service=app_service,
+ authenticated_entity=user_info.token_owner,
)
+
+ request.requester = requester
+ opentracing.set_tag("authenticated_entity", user_info.token_owner)
+ opentracing.set_tag("user_id", user_info.user_id)
+ if device_id:
+ opentracing.set_tag("device_id", device_id)
+
+ return requester
except KeyError:
raise MissingClientTokenError()
@@ -286,7 +292,7 @@ class Auth:
async def get_user_by_access_token(
self, token: str, rights: str = "access", allow_expired: bool = False,
- ) -> dict:
+ ) -> TokenLookupResult:
""" Validate access token and get user_id from it
Args:
@@ -295,13 +301,7 @@ class Auth:
allow this
allow_expired: If False, raises an InvalidClientTokenError
if the token is expired
- Returns:
- dict that includes:
- `user` (UserID)
- `is_guest` (bool)
- `shadow_banned` (bool)
- `token_id` (int|None): access token id. May be None if guest
- `device_id` (str|None): device corresponding to access token
+
Raises:
InvalidClientTokenError if a user by that token exists, but the token is
expired
@@ -311,9 +311,9 @@ class Auth:
if rights == "access":
# first look in the database
- r = await self._look_up_user_by_access_token(token)
+ r = await self.store.get_user_by_access_token(token)
if r:
- valid_until_ms = r["valid_until_ms"]
+ valid_until_ms = r.valid_until_ms
if (
not allow_expired
and valid_until_ms is not None
@@ -330,7 +330,6 @@ class Auth:
# otherwise it needs to be a valid macaroon
try:
user_id, guest = self._parse_and_validate_macaroon(token, rights)
- user = UserID.from_string(user_id)
if rights == "access":
if not guest:
@@ -356,23 +355,17 @@ class Auth:
raise InvalidClientTokenError(
"Guest access token used for regular user"
)
- ret = {
- "user": user,
- "is_guest": True,
- "shadow_banned": False,
- "token_id": None,
+
+ ret = TokenLookupResult(
+ user_id=user_id,
+ is_guest=True,
# all guests get the same device id
- "device_id": GUEST_DEVICE_ID,
- }
+ device_id=GUEST_DEVICE_ID,
+ )
elif rights == "delete_pusher":
# We don't store these tokens in the database
- ret = {
- "user": user,
- "is_guest": False,
- "shadow_banned": False,
- "token_id": None,
- "device_id": None,
- }
+
+ ret = TokenLookupResult(user_id=user_id, is_guest=False)
else:
raise RuntimeError("Unknown rights setting %s", rights)
return ret
@@ -481,31 +474,15 @@ class Auth:
now = self.hs.get_clock().time_msec()
return now < expiry
- async def _look_up_user_by_access_token(self, token):
- ret = await self.store.get_user_by_access_token(token)
- if not ret:
- return None
-
- # we use ret.get() below because *lots* of unit tests stub out
- # get_user_by_access_token in a way where it only returns a couple of
- # the fields.
- user_info = {
- "user": UserID.from_string(ret.get("name")),
- "token_id": ret.get("token_id", None),
- "is_guest": False,
- "shadow_banned": ret.get("shadow_banned"),
- "device_id": ret.get("device_id"),
- "valid_until_ms": ret.get("valid_until_ms"),
- }
- return user_info
-
def get_appservice_by_req(self, request):
token = self.get_access_token_from_request(request)
service = self.store.get_app_service_by_token(token)
if not service:
logger.warning("Unrecognised appservice access token.")
raise InvalidClientTokenError()
- request.authenticated_entity = service.sender
+ request.requester = synapse.types.create_requester(
+ service.sender, app_service=service
+ )
return service
async def is_server_admin(self, user: UserID) -> bool:
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index f6f7b2bf..9c8dc785 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -49,7 +49,6 @@ def register_sighup(func, *args, **kwargs):
Args:
func (function): Function to be called when sent a SIGHUP signal.
- Will be called with a single default argument, the homeserver.
*args, **kwargs: args and kwargs to be passed to the target function.
"""
_sighup_callbacks.append((func, args, kwargs))
@@ -251,13 +250,13 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
sdnotify(b"RELOADING=1")
for i, args, kwargs in _sighup_callbacks:
- i(hs, *args, **kwargs)
+ i(*args, **kwargs)
sdnotify(b"READY=1")
signal.signal(signal.SIGHUP, handle_sighup)
- register_sighup(refresh_certificate)
+ register_sighup(refresh_certificate, hs)
# Load the certificate from disk.
refresh_certificate(hs)
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index f70841ae..3944780a 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -52,11 +52,11 @@ class ApplicationService:
self,
token,
hostname,
+ id,
+ sender,
url=None,
namespaces=None,
hs_token=None,
- sender=None,
- id=None,
protocols=None,
rate_limited=True,
ip_range_whitelist=None,
diff --git a/synapse/config/cas.py b/synapse/config/cas.py
index 4526c1a6..2f97e6d2 100644
--- a/synapse/config/cas.py
+++ b/synapse/config/cas.py
@@ -26,14 +26,14 @@ class CasConfig(Config):
def read_config(self, config, **kwargs):
cas_config = config.get("cas_config", None)
- if cas_config:
- self.cas_enabled = cas_config.get("enabled", True)
+ self.cas_enabled = cas_config and cas_config.get("enabled", True)
+
+ if self.cas_enabled:
self.cas_server_url = cas_config["server_url"]
self.cas_service_url = cas_config["service_url"]
self.cas_displayname_attribute = cas_config.get("displayname_attribute")
- self.cas_required_attributes = cas_config.get("required_attributes", {})
+ self.cas_required_attributes = cas_config.get("required_attributes") or {}
else:
- self.cas_enabled = False
self.cas_server_url = None
self.cas_service_url = None
self.cas_displayname_attribute = None
@@ -41,13 +41,35 @@ class CasConfig(Config):
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """
- # Enable CAS for registration and login.
+ # Enable Central Authentication Service (CAS) for registration and login.
#
- #cas_config:
- # enabled: true
- # server_url: "https://cas-server.com"
- # service_url: "https://homeserver.domain.com:8448"
- # #displayname_attribute: name
- # #required_attributes:
- # # name: value
+ cas_config:
+ # Uncomment the following to enable authorization against a CAS server.
+ # Defaults to false.
+ #
+ #enabled: true
+
+ # The URL of the CAS authorization endpoint.
+ #
+ #server_url: "https://cas-server.com"
+
+ # The public URL of the homeserver.
+ #
+ #service_url: "https://homeserver.domain.com:8448"
+
+ # The attribute of the CAS response to use as the display name.
+ #
+ # If unset, no displayname will be set.
+ #
+ #displayname_attribute: name
+
+ # It is possible to configure Synapse to only allow logins if CAS attributes
+ # match particular values. All of the keys in the mapping below must exist
+ # and the values must match the given value. Alternately if the given value
+ # is None then any value is allowed (the attribute just must exist).
+ # All of the listed attributes must match for the login to be permitted.
+ #
+ #required_attributes:
+ # userGroup: "staff"
+ # department: None
"""
diff --git a/synapse/config/jwt_config.py b/synapse/config/jwt_config.py
index 3252ad9e..f30330ab 100644
--- a/synapse/config/jwt_config.py
+++ b/synapse/config/jwt_config.py
@@ -63,7 +63,7 @@ class JWTConfig(Config):
# and issued at ("iat") claims are validated if present.
#
# Note that this is a non-standard login type and client support is
- # expected to be non-existant.
+ # expected to be non-existent.
#
# See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md.
#
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 13d6f6a3..d4e887a3 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -23,7 +23,6 @@ from string import Template
import yaml
from twisted.logger import (
- ILogObserver,
LogBeginner,
STDLibLogObserver,
eventAsText,
@@ -32,11 +31,9 @@ from twisted.logger import (
import synapse
from synapse.app import _base as appbase
-from synapse.logging._structured import (
- reload_structured_logging,
- setup_structured_logging,
-)
+from synapse.logging._structured import setup_structured_logging
from synapse.logging.context import LoggingContextFilter
+from synapse.logging.filter import MetadataFilter
from synapse.util.versionstring import get_version_string
from ._base import Config, ConfigError
@@ -48,7 +45,11 @@ DEFAULT_LOG_CONFIG = Template(
# This is a YAML file containing a standard Python logging configuration
# dictionary. See [1] for details on the valid settings.
#
+# Synapse also supports structured logging for machine readable logs which can
+# be ingested by ELK stacks. See [2] for details.
+#
# [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
+# [2]: https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md
version: 1
@@ -105,7 +106,7 @@ root:
# then write them to a file.
#
# Replace "buffer" with "console" to log to stderr instead. (Note that you'll
- # also need to update the configuation for the `twisted` logger above, in
+ # also need to update the configuration for the `twisted` logger above, in
# this case.)
#
handlers: [buffer]
@@ -176,11 +177,11 @@ class LoggingConfig(Config):
log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
-def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
+def _setup_stdlib_logging(config, log_config_path, logBeginner: LogBeginner) -> None:
"""
- Set up Python stdlib logging.
+ Set up Python standard library logging.
"""
- if log_config is None:
+ if log_config_path is None:
log_format = (
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
" - %(message)s"
@@ -196,7 +197,8 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
handler.setFormatter(formatter)
logger.addHandler(handler)
else:
- logging.config.dictConfig(log_config)
+ # Load the logging configuration.
+ _load_logging_config(log_config_path)
# We add a log record factory that runs all messages through the
# LoggingContextFilter so that we get the context *at the time we log*
@@ -204,12 +206,14 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
# filter options, but care must when using e.g. MemoryHandler to buffer
# writes.
- log_filter = LoggingContextFilter(request="")
+ log_context_filter = LoggingContextFilter(request="")
+ log_metadata_filter = MetadataFilter({"server_name": config.server_name})
old_factory = logging.getLogRecordFactory()
def factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
- log_filter.filter(record)
+ log_context_filter.filter(record)
+ log_metadata_filter.filter(record)
return record
logging.setLogRecordFactory(factory)
@@ -255,21 +259,40 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
if not config.no_redirect_stdio:
print("Redirected stdout/stderr to logs")
- return observer
-
-def _reload_stdlib_logging(*args, log_config=None):
- logger = logging.getLogger("")
+def _load_logging_config(log_config_path: str) -> None:
+ """
+ Configure logging from a log config path.
+ """
+ with open(log_config_path, "rb") as f:
+ log_config = yaml.safe_load(f.read())
if not log_config:
- logger.warning("Reloaded a blank config?")
+ logging.warning("Loaded a blank logging config?")
+
+ # If the old structured logging configuration is being used, convert it to
+ # the new style configuration.
+ if "structured" in log_config and log_config.get("structured"):
+ log_config = setup_structured_logging(log_config)
logging.config.dictConfig(log_config)
+def _reload_logging_config(log_config_path):
+ """
+ Reload the log configuration from the file and apply it.
+ """
+ # If no log config path was given, it cannot be reloaded.
+ if log_config_path is None:
+ return
+
+ _load_logging_config(log_config_path)
+ logging.info("Reloaded log config from %s due to SIGHUP", log_config_path)
+
+
def setup_logging(
hs, config, use_worker_options=False, logBeginner: LogBeginner = globalLogBeginner
-) -> ILogObserver:
+) -> None:
"""
Set up the logging subsystem.
@@ -282,41 +305,18 @@ def setup_logging(
logBeginner: The Twisted logBeginner to use.
- Returns:
- The "root" Twisted Logger observer, suitable for sending logs to from a
- Logger instance.
"""
- log_config = config.worker_log_config if use_worker_options else config.log_config
-
- def read_config(*args, callback=None):
- if log_config is None:
- return None
-
- with open(log_config, "rb") as f:
- log_config_body = yaml.safe_load(f.read())
-
- if callback:
- callback(log_config=log_config_body)
- logging.info("Reloaded log config from %s due to SIGHUP", log_config)
-
- return log_config_body
+ log_config_path = (
+ config.worker_log_config if use_worker_options else config.log_config
+ )
- log_config_body = read_config()
+ # Perform one-time logging configuration.
+ _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
+ # Add a SIGHUP handler to reload the logging configuration, if one is available.
+ appbase.register_sighup(_reload_logging_config, log_config_path)
- if log_config_body and log_config_body.get("structured") is True:
- logger = setup_structured_logging(
- hs, config, log_config_body, logBeginner=logBeginner
- )
- appbase.register_sighup(read_config, callback=reload_structured_logging)
- else:
- logger = _setup_stdlib_logging(config, log_config_body, logBeginner=logBeginner)
- appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
-
- # make sure that the first thing we log is a thing we can grep backwards
- # for
+ # Log immediately so we can grep backwards.
logging.warning("***** STARTING SERVER *****")
logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
logging.info("Server hostname: %s", config.server_name)
logging.info("Instance name: %s", hs.get_instance_name())
-
- return logger
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index 7597fbc8..69d18834 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc_config.py
@@ -87,11 +87,10 @@ class OIDCConfig(Config):
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
- # OpenID Connect integration. The following settings can be used to make Synapse
- # use an OpenID Connect Provider for authentication, instead of its internal
- # password database.
+ # Enable OpenID Connect (OIDC) / OAuth 2.0 for registration and login.
#
- # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md.
+ # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md
+ # for some example configurations.
#
oidc_config:
# Uncomment the following to enable authorization against an OpenID Connect
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index d7e3690a..b0a77a2e 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -143,7 +143,7 @@ class RegistrationConfig(Config):
RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
}
- # Pull the creater/inviter from the configuration, this gets used to
+ # Pull the creator/inviter from the configuration, this gets used to
# send invites for invite-only rooms.
mxid_localpart = config.get("auto_join_mxid_localpart")
self.auto_join_user_id = None
diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py
index 6de1f9d1..92e1b675 100644
--- a/synapse/config/room_directory.py
+++ b/synapse/config/room_directory.py
@@ -99,7 +99,7 @@ class RoomDirectoryConfig(Config):
#
# Options for the rules include:
#
- # user_id: Matches agaisnt the creator of the alias
+ # user_id: Matches against the creator of the alias
# room_id: Matches against the room ID being published
# alias: Matches against any current local or canonical aliases
# associated with the room
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index 99aa8b3b..2ff7dfb3 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -216,10 +216,8 @@ class SAML2Config(Config):
return """\
## Single sign-on integration ##
- # Enable SAML2 for registration and login. Uses pysaml2.
- #
- # At least one of `sp_config` or `config_path` must be set in this section to
- # enable SAML login.
+ # The following settings can be used to make Synapse use a single sign-on
+ # provider for authentication, instead of its internal password database.
#
# You will probably also want to set the following options to `false` to
# disable the regular login/registration flows:
@@ -228,6 +226,11 @@ class SAML2Config(Config):
#
# You will also want to investigate the settings under the "sso" configuration
# section below.
+
+ # Enable SAML2 for registration and login. Uses pysaml2.
+ #
+ # At least one of `sp_config` or `config_path` must be set in this section to
+ # enable SAML login.
#
# Once SAML support is enabled, a metadata file will be exposed at
# https://<server>:<port>/_matrix/saml2/metadata.xml, which you may be able to
@@ -243,40 +246,64 @@ class SAML2Config(Config):
# so it is not normally necessary to specify them unless you need to
# override them.
#
- #sp_config:
- # # point this to the IdP's metadata. You can use either a local file or
- # # (preferably) a URL.
- # metadata:
- # #local: ["saml2/idp.xml"]
- # remote:
- # - url: https://our_idp/metadata.xml
- #
- # # By default, the user has to go to our login page first. If you'd like
- # # to allow IdP-initiated login, set 'allow_unsolicited: true' in a
- # # 'service.sp' section:
- # #
- # #service:
- # # sp:
- # # allow_unsolicited: true
- #
- # # The examples below are just used to generate our metadata xml, and you
- # # may well not need them, depending on your setup. Alternatively you
- # # may need a whole lot more detail - see the pysaml2 docs!
- #
- # description: ["My awesome SP", "en"]
- # name: ["Test SP", "en"]
- #
- # organization:
- # name: Example com
- # display_name:
- # - ["Example co", "en"]
- # url: "http://example.com"
- #
- # contact_person:
- # - given_name: Bob
- # sur_name: "the Sysadmin"
- # email_address": ["admin@example.com"]
- # contact_type": technical
+ sp_config:
+ # Point this to the IdP's metadata. You must provide either a local
+ # file via the `local` attribute or (preferably) a URL via the
+ # `remote` attribute.
+ #
+ #metadata:
+ # local: ["saml2/idp.xml"]
+ # remote:
+ # - url: https://our_idp/metadata.xml
+
+ # By default, the user has to go to our login page first. If you'd like
+ # to allow IdP-initiated login, set 'allow_unsolicited: true' in a
+ # 'service.sp' section:
+ #
+ #service:
+ # sp:
+ # allow_unsolicited: true
+
+ # The examples below are just used to generate our metadata xml, and you
+ # may well not need them, depending on your setup. Alternatively you
+ # may need a whole lot more detail - see the pysaml2 docs!
+
+ #description: ["My awesome SP", "en"]
+ #name: ["Test SP", "en"]
+
+ #ui_info:
+ # display_name:
+ # - lang: en
+ # text: "Display Name is the descriptive name of your service."
+ # description:
+ # - lang: en
+ # text: "Description should be a short paragraph explaining the purpose of the service."
+ # information_url:
+ # - lang: en
+ # text: "https://example.com/terms-of-service"
+ # privacy_statement_url:
+ # - lang: en
+ # text: "https://example.com/privacy-policy"
+ # keywords:
+ # - lang: en
+ # text: ["Matrix", "Element"]
+ # logo:
+ # - lang: en
+ # text: "https://example.com/logo.svg"
+ # width: "200"
+ # height: "80"
+
+ #organization:
+ # name: Example com
+ # display_name:
+ # - ["Example co", "en"]
+ # url: "http://example.com"
+
+ #contact_person:
+ # - given_name: Bob
+ # sur_name: "the Sysadmin"
+ # email_address": ["admin@example.com"]
+ # contact_type": technical
# Instead of putting the config inline as above, you can specify a
# separate pysaml2 configuration file:
diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py
index 8be13461..0c1a854f 100644
--- a/synapse/config/tracer.py
+++ b/synapse/config/tracer.py
@@ -67,7 +67,7 @@ class TracerConfig(Config):
# This is a list of regexes which are matched against the server_name of the
# homeserver.
#
- # By defult, it is empty, so no servers are matched.
+ # By default, it is empty, so no servers are matched.
#
#homeserver_whitelist:
# - ".*"
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 79668a40..57fd426e 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -149,7 +149,7 @@ class FederationPolicyForHTTPS:
return SSLClientConnectionCreator(host, ssl_context, should_verify)
def creatorForNetloc(self, hostname, port):
- """Implements the IPolicyForHTTPS interace so that this can be passed
+ """Implements the IPolicyForHTTPS interface so that this can be passed
directly to agents.
"""
return self.get_options(hostname)
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 65df6210..8028663f 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -59,7 +59,7 @@ class DictProperty:
#
# To exclude the KeyError from the traceback, we explicitly
# 'raise from e1.__context__' (which is better than 'raise from None',
- # becuase that would omit any *earlier* exceptions).
+ # because that would omit any *earlier* exceptions).
#
raise AttributeError(
"'%s' has no '%s' property" % (type(instance), self.key)
@@ -368,7 +368,7 @@ class FrozenEvent(EventBase):
return self.__repr__()
def __repr__(self):
- return "<FrozenEvent event_id='%s', type='%s', state_key='%s'>" % (
+ return "<FrozenEvent event_id=%r, type=%r, state_key=%r>" % (
self.get("event_id", None),
self.get("type", None),
self.get("state_key", None),
@@ -451,7 +451,7 @@ class FrozenEventV2(EventBase):
return self.__repr__()
def __repr__(self):
- return "<%s event_id='%s', type='%s', state_key='%s'>" % (
+ return "<%s event_id=%r, type=%r, state_key=%r>" % (
self.__class__.__name__,
self.event_id,
self.get("type", None),
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 355cbe05..14f7f115 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -180,7 +180,7 @@ def only_fields(dictionary, fields):
in 'fields'.
If there are no event fields specified then all fields are included.
- The entries may include '.' charaters to indicate sub-fields.
+ The entries may include '.' characters to indicate sub-fields.
So ['content.body'] will include the 'body' field of the 'content' object.
A literal '.' character in a field name may be escaped using a '\'.
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 5f9af852..f8f3b1a3 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -13,20 +13,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import EventFormatVersions
+from synapse.config.homeserver import HomeServerConfig
+from synapse.events import EventBase
+from synapse.events.builder import EventBuilder
from synapse.events.utils import validate_canonicaljson
+from synapse.federation.federation_server import server_matches_acl_event
from synapse.types import EventID, RoomID, UserID
class EventValidator:
- def validate_new(self, event, config):
+ def validate_new(self, event: EventBase, config: HomeServerConfig):
"""Validates the event has roughly the right format
Args:
- event (FrozenEvent): The event to validate.
- config (Config): The homeserver's configuration.
+ event: The event to validate.
+ config: The homeserver's configuration.
"""
self.validate_builder(event)
@@ -76,12 +82,18 @@ class EventValidator:
if event.type == EventTypes.Retention:
self._validate_retention(event)
- def _validate_retention(self, event):
+ if event.type == EventTypes.ServerACL:
+ if not server_matches_acl_event(config.server_name, event):
+ raise SynapseError(
+ 400, "Can't create an ACL event that denies the local server"
+ )
+
+ def _validate_retention(self, event: EventBase):
"""Checks that an event that defines the retention policy for a room respects the
format enforced by the spec.
Args:
- event (FrozenEvent): The event to validate.
+ event: The event to validate.
"""
if not event.is_state():
raise SynapseError(code=400, msg="must be a state event")
@@ -116,13 +128,10 @@ class EventValidator:
errcode=Codes.BAD_JSON,
)
- def validate_builder(self, event):
+ def validate_builder(self, event: Union[EventBase, EventBuilder]):
"""Validates that the builder/event has roughly the right format. Only
checks values that we expect a proto event to have, rather than all the
fields an event would have
-
- Args:
- event (EventBuilder|FrozenEvent)
"""
strings = ["room_id", "sender", "type"]
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 3a6b9563..a0933fae 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -154,7 +154,7 @@ class Authenticator:
)
logger.debug("Request from %s", origin)
- request.authenticated_entity = origin
+ request.requester = origin
# If we get a valid signed request from the other side, its probably
# alive
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index a86b3deb..41cf07cc 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -22,7 +22,7 @@ attestations have a validity period so need to be periodically renewed.
If a user leaves (or gets kicked out of) a group, either side can still use
their attestation to "prove" their membership, until the attestation expires.
Therefore attestations shouldn't be relied on to prove membership in important
-cases, but can for less important situtations, e.g. showing a users membership
+cases, but can for less important situations, e.g. showing a users membership
of groups on their profile, showing flairs, etc.
An attestation is a signed blob of json that looks like:
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index e5f85b47..0d042cbf 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -113,7 +113,7 @@ class GroupsServerWorkerHandler:
entry = await self.room_list_handler.generate_room_entry(
room_id, len(joined_users), with_alias=False, allow_private=True
)
- entry = dict(entry) # so we don't change whats cached
+ entry = dict(entry) # so we don't change what's cached
entry.pop("room_id", None)
room_entry["profile"] = entry
@@ -550,7 +550,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
group_id, room_id, is_public=is_public
)
else:
- raise SynapseError(400, "Uknown config option")
+ raise SynapseError(400, "Unknown config option")
return {}
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index fd4f762f..664d09da 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -18,19 +18,22 @@ import email.utils
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
-from typing import List
+from typing import TYPE_CHECKING, List
-from synapse.api.errors import StoreError
+from synapse.api.errors import StoreError, SynapseError
from synapse.logging.context import make_deferred_yieldable
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import UserID
from synapse.util import stringutils
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
logger = logging.getLogger(__name__)
class AccountValidityHandler:
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.hs = hs
self.config = hs.config
self.store = self.hs.get_datastore()
@@ -67,7 +70,7 @@ class AccountValidityHandler:
self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000)
@wrap_as_background_process("send_renewals")
- async def _send_renewal_emails(self):
+ async def _send_renewal_emails(self) -> None:
"""Gets the list of users whose account is expiring in the amount of time
configured in the ``renew_at`` parameter from the ``account_validity``
configuration, and sends renewal emails to all of these users as long as they
@@ -81,11 +84,25 @@ class AccountValidityHandler:
user_id=user["user_id"], expiration_ts=user["expiration_ts_ms"]
)
- async def send_renewal_email_to_user(self, user_id: str):
+ async def send_renewal_email_to_user(self, user_id: str) -> None:
+ """
+ Send a renewal email for a specific user.
+
+ Args:
+ user_id: The user ID to send a renewal email for.
+
+ Raises:
+ SynapseError if the user is not set to renew.
+ """
expiration_ts = await self.store.get_expiration_ts_for_user(user_id)
+
+ # If this user isn't set to be expired, raise an error.
+ if expiration_ts is None:
+ raise SynapseError(400, "User has no expiration time: %s" % (user_id,))
+
await self._send_renewal_email(user_id, expiration_ts)
- async def _send_renewal_email(self, user_id: str, expiration_ts: int):
+ async def _send_renewal_email(self, user_id: str, expiration_ts: int) -> None:
"""Sends out a renewal email to every email address attached to the given user
with a unique link allowing them to renew their account.
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 1ce2091b..a7039445 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -88,7 +88,7 @@ class AdminHandler(BaseHandler):
# We only try and fetch events for rooms the user has been in. If
# they've been e.g. invited to a room without joining then we handle
- # those seperately.
+ # those separately.
rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id)
for index, room in enumerate(rooms):
@@ -226,7 +226,7 @@ class ExfiltrationWriter:
"""
def finished(self):
- """Called when all data has succesfully been exported and written.
+ """Called when all data has successfully been exported and written.
This functions return value is passed to the caller of
`export_user_data`.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 64dea23f..9fc84442 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -12,9 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
-from typing import Dict, List, Optional
+from typing import TYPE_CHECKING, Dict, List, Optional, Union
from prometheus_client import Counter
@@ -30,17 +29,24 @@ from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
+from synapse.storage.databases.main.directory import RoomAliasMapping
+from synapse.types import Collection, JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.metrics import Measure
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
logger = logging.getLogger(__name__)
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
class ApplicationServicesHandler:
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.is_mine_id = hs.is_mine_id
self.appservice_api = hs.get_application_service_api()
@@ -53,7 +59,7 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
- async def notify_interested_services(self, max_token: RoomStreamToken):
+ def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
@@ -72,6 +78,12 @@ class ApplicationServicesHandler:
if self.is_processing:
return
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._notify_interested_services(max_token)
+
+ @wrap_as_background_process("notify_interested_services")
+ async def _notify_interested_services(self, max_token: RoomStreamToken):
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
@@ -166,8 +178,11 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
- async def notify_interested_services_ephemeral(
- self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+ def notify_interested_services_ephemeral(
+ self,
+ stream_key: str,
+ new_token: Optional[int],
+ users: Collection[Union[str, UserID]] = [],
):
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
@@ -183,13 +198,34 @@ class ApplicationServicesHandler:
new_token: The latest stream token
users: The user(s) involved with the event.
"""
+ if not self.notify_appservices:
+ return
+
+ if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+ return
+
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
- if not services or not self.notify_appservices:
+ if not services:
return
+
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._notify_interested_services_ephemeral(
+ services, stream_key, new_token, users
+ )
+
+ @wrap_as_background_process("notify_interested_services_ephemeral")
+ async def _notify_interested_services_ephemeral(
+ self,
+ services: List[ApplicationService],
+ stream_key: str,
+ new_token: Optional[int],
+ users: Collection[Union[str, UserID]],
+ ):
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
@@ -214,7 +250,9 @@ class ApplicationServicesHandler:
service, "presence", new_token
)
- async def _handle_typing(self, service: ApplicationService, new_token: int):
+ async def _handle_typing(
+ self, service: ApplicationService, new_token: int
+ ) -> List[JsonDict]:
typing_source = self.event_sources.sources["typing"]
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
@@ -226,7 +264,7 @@ class ApplicationServicesHandler:
)
return typing
- async def _handle_receipts(self, service: ApplicationService):
+ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
@@ -237,7 +275,7 @@ class ApplicationServicesHandler:
return receipts
async def _handle_presence(
- self, service: ApplicationService, users: Collection[UserID]
+ self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
events = [] # type: List[JsonDict]
presence_source = self.event_sources.sources["presence"]
@@ -245,6 +283,9 @@ class ApplicationServicesHandler:
service, "presence"
)
for user in users:
+ if isinstance(user, str):
+ user = UserID.from_string(user)
+
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
@@ -265,11 +306,11 @@ class ApplicationServicesHandler:
return events
- async def query_user_exists(self, user_id):
+ async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.
Args:
- user_id(str): The user to query if they exist on any AS.
+ user_id: The user to query if they exist on any AS.
Returns:
True if this user exists on at least one application service.
"""
@@ -280,11 +321,13 @@ class ApplicationServicesHandler:
return True
return False
- async def query_room_alias_exists(self, room_alias):
+ async def query_room_alias_exists(
+ self, room_alias: RoomAlias
+ ) -> Optional[RoomAliasMapping]:
"""Check if an application service knows this room alias exists.
Args:
- room_alias(RoomAlias): The room alias to query.
+ room_alias: The room alias to query.
Returns:
namedtuple: with keys "room_id" and "servers" or None if no
association can be found.
@@ -300,10 +343,13 @@ class ApplicationServicesHandler:
)
if is_known_alias:
# the alias exists now so don't query more ASes.
- result = await self.store.get_association_from_room_alias(room_alias)
- return result
+ return await self.store.get_association_from_room_alias(room_alias)
- async def query_3pe(self, kind, protocol, fields):
+ return None
+
+ async def query_3pe(
+ self, kind: str, protocol: str, fields: Dict[bytes, List[bytes]]
+ ) -> List[JsonDict]:
services = self._get_services_for_3pn(protocol)
results = await make_deferred_yieldable(
@@ -325,7 +371,9 @@ class ApplicationServicesHandler:
return ret
- async def get_3pe_protocols(self, only_protocol=None):
+ async def get_3pe_protocols(
+ self, only_protocol: Optional[str] = None
+ ) -> Dict[str, JsonDict]:
services = self.store.get_app_services()
protocols = {} # type: Dict[str, List[JsonDict]]
@@ -343,7 +391,7 @@ class ApplicationServicesHandler:
if info is not None:
protocols[p].append(info)
- def _merge_instances(infos):
+ def _merge_instances(infos: List[JsonDict]) -> JsonDict:
if not infos:
return {}
@@ -358,19 +406,17 @@ class ApplicationServicesHandler:
return combined
- for p in protocols.keys():
- protocols[p] = _merge_instances(protocols[p])
-
- return protocols
+ return {p: _merge_instances(protocols[p]) for p in protocols.keys()}
- async def _get_services_for_event(self, event):
+ async def _get_services_for_event(
+ self, event: EventBase
+ ) -> List[ApplicationService]:
"""Retrieve a list of application services interested in this event.
Args:
- event(Event): The event to check. Can be None if alias_list is not.
+ event: The event to check. Can be None if alias_list is not.
Returns:
- list<ApplicationService>: A list of services interested in this
- event based on the service regex.
+ A list of services interested in this event based on the service regex.
"""
services = self.store.get_app_services()
@@ -384,17 +430,15 @@ class ApplicationServicesHandler:
return interested_list
- def _get_services_for_user(self, user_id):
+ def _get_services_for_user(self, user_id: str) -> List[ApplicationService]:
services = self.store.get_app_services()
- interested_list = [s for s in services if (s.is_interested_in_user(user_id))]
- return interested_list
+ return [s for s in services if (s.is_interested_in_user(user_id))]
- def _get_services_for_3pn(self, protocol):
+ def _get_services_for_3pn(self, protocol: str) -> List[ApplicationService]:
services = self.store.get_app_services()
- interested_list = [s for s in services if s.is_interested_in_protocol(protocol)]
- return interested_list
+ return [s for s in services if s.is_interested_in_protocol(protocol)]
- async def _is_unknown_user(self, user_id):
+ async def _is_unknown_user(self, user_id: str) -> bool:
if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
@@ -409,9 +453,8 @@ class ApplicationServicesHandler:
service_list = [s for s in services if s.sender == user_id]
return len(service_list) == 0
- async def _check_user_exists(self, user_id):
+ async def _check_user_exists(self, user_id: str) -> bool:
unknown_user = await self._is_unknown_user(user_id)
if unknown_user:
- exists = await self.query_user_exists(user_id)
- return exists
+ return await self.query_user_exists(user_id)
return True
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 8619fbb9..213baea2 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,10 +18,20 @@ import logging
import time
import unicodedata
import urllib.parse
-from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Tuple,
+ Union,
+)
import attr
-import bcrypt # type: ignore[import]
+import bcrypt
import pymacaroons
from synapse.api.constants import LoginType
@@ -49,6 +59,9 @@ from synapse.util.threepids import canonicalise_email
from ._base import BaseHandler
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
logger = logging.getLogger(__name__)
@@ -149,11 +162,7 @@ class SsoLoginExtraAttributes:
class AuthHandler(BaseHandler):
SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000
- def __init__(self, hs):
- """
- Args:
- hs (synapse.server.HomeServer):
- """
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.checkers = {} # type: Dict[str, UserInteractiveAuthChecker]
@@ -172,10 +181,15 @@ class AuthHandler(BaseHandler):
# better way to break the loop
account_handler = ModuleApi(hs, self)
- self.password_providers = [
- module(config=config, account_handler=account_handler)
- for module, config in hs.config.password_providers
- ]
+ self.password_providers = []
+ for module, config in hs.config.password_providers:
+ try:
+ self.password_providers.append(
+ module(config=config, account_handler=account_handler)
+ )
+ except Exception as e:
+ logger.error("Error while initializing %r: %s", module, e)
+ raise
logger.info("Extra password_providers: %r", self.password_providers)
@@ -470,9 +484,7 @@ class AuthHandler(BaseHandler):
# authentication flow.
await self.store.set_ui_auth_clientdict(sid, clientdict)
- user_agent = request.requestHeaders.getRawHeaders(b"User-Agent", default=[b""])[
- 0
- ].decode("ascii", "surrogateescape")
+ user_agent = request.get_user_agent("")
await self.store.add_user_agent_ip_to_ui_auth_session(
session.session_id, user_agent, clientip
@@ -692,7 +704,7 @@ class AuthHandler(BaseHandler):
Creates a new access token for the user with the given user ID.
The user is assumed to have been authenticated by some other
- machanism (e.g. CAS), and the user_id converted to the canonical case.
+ mechanism (e.g. CAS), and the user_id converted to the canonical case.
The device will be recorded in the table if it is not there already.
@@ -984,17 +996,17 @@ class AuthHandler(BaseHandler):
# This might return an awaitable, if it does block the log out
# until it completes.
result = provider.on_logged_out(
- user_id=str(user_info["user"]),
- device_id=user_info["device_id"],
+ user_id=user_info.user_id,
+ device_id=user_info.device_id,
access_token=access_token,
)
if inspect.isawaitable(result):
await result
# delete pushers associated with this access token
- if user_info["token_id"] is not None:
+ if user_info.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
- str(user_info["user"]), (user_info["token_id"],)
+ user_info.user_id, (user_info.token_id,)
)
async def delete_access_tokens_for_user(
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index a4cc4b9a..048a3b3c 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -212,9 +212,7 @@ class CasHandler:
else:
if not registered_user_id:
# Pull out the user-agent and IP from the request.
- user_agent = request.requestHeaders.getRawHeaders(
- b"User-Agent", default=[b""]
- )[0].decode("ascii", "surrogateescape")
+ user_agent = request.get_user_agent("")
ip_address = self.hs.get_ip_from_request(request)
registered_user_id = await self._registration_handler.register_user(
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 611742ae..92975215 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -129,6 +129,11 @@ class E2eKeysHandler:
if user_id in local_query:
results[user_id] = keys
+ # Get cached cross-signing keys
+ cross_signing_keys = await self.get_cross_signing_keys_from_cache(
+ device_keys_query, from_user_id
+ )
+
# Now attempt to get any remote devices from our local cache.
remote_queries_not_in_cache = {}
if remote_queries:
@@ -155,16 +160,28 @@ class E2eKeysHandler:
unsigned["device_display_name"] = device_display_name
user_devices[device_id] = result
+ # check for missing cross-signing keys.
+ for user_id in remote_queries.keys():
+ cached_cross_master = user_id in cross_signing_keys["master_keys"]
+ cached_cross_selfsigning = (
+ user_id in cross_signing_keys["self_signing_keys"]
+ )
+
+ # check if we are missing only one of cross-signing master or
+ # self-signing key, but the other one is cached.
+ # as we need both, this will issue a federation request.
+ # if we don't have any of the keys, either the user doesn't have
+ # cross-signing set up, or the cached device list
+ # is not (yet) updated.
+ if cached_cross_master ^ cached_cross_selfsigning:
+ user_ids_not_in_cache.add(user_id)
+
+ # add those users to the list to fetch over federation.
for user_id in user_ids_not_in_cache:
domain = get_domain_from_id(user_id)
r = remote_queries_not_in_cache.setdefault(domain, {})
r[user_id] = remote_queries[user_id]
- # Get cached cross-signing keys
- cross_signing_keys = await self.get_cross_signing_keys_from_cache(
- device_keys_query, from_user_id
- )
-
# Now fetch any devices that we don't have in our cache
@trace
async def do_remote_query(destination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index fde8f005..c3869577 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -112,7 +112,7 @@ class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
a) handling received Pdus before handing them on as Events to the rest
- of the homeserver (including auth and state conflict resoultion)
+ of the homeserver (including auth and state conflict resolutions)
b) converting events that were produced by local clients that may need
to be sent to remote homeservers.
c) doing the necessary dances to invite remote users and join remote
@@ -477,7 +477,7 @@ class FederationHandler(BaseHandler):
# ----
#
# Update richvdh 2018/09/18: There are a number of problems with timing this
- # request out agressively on the client side:
+ # request out aggressively on the client side:
#
# - it plays badly with the server-side rate-limiter, which starts tarpitting you
# if you send too many requests at once, so you end up with the server carefully
@@ -495,13 +495,13 @@ class FederationHandler(BaseHandler):
# we'll end up back here for the *next* PDU in the list, which exacerbates the
# problem.
#
- # - the agressive 10s timeout was introduced to deal with incoming federation
+ # - the aggressive 10s timeout was introduced to deal with incoming federation
# requests taking 8 hours to process. It's not entirely clear why that was going
# on; certainly there were other issues causing traffic storms which are now
# resolved, and I think in any case we may be more sensible about our locking
# now. We're *certainly* more sensible about our logging.
#
- # All that said: Let's try increasing the timout to 60s and see what happens.
+ # All that said: Let's try increasing the timeout to 60s and see what happens.
try:
missing_events = await self.federation_client.get_missing_events(
@@ -1120,7 +1120,7 @@ class FederationHandler(BaseHandler):
logger.info(str(e))
continue
except RequestSendFailed as e:
- logger.info("Falied to get backfill from %s because %s", dom, e)
+ logger.info("Failed to get backfill from %s because %s", dom, e)
continue
except FederationDeniedError as e:
logger.info(e)
@@ -1545,7 +1545,7 @@ class FederationHandler(BaseHandler):
#
# The reasons we have the destination server rather than the origin
# server send it are slightly mysterious: the origin server should have
- # all the neccessary state once it gets the response to the send_join,
+ # all the necessary state once it gets the response to the send_join,
# so it could send the event itself if it wanted to. It may be that
# doing it this way reduces failure modes, or avoids certain attacks
# where a new server selectively tells a subset of the federation that
@@ -1649,7 +1649,7 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = True
event.internal_metadata.out_of_band_membership = True
- # Try the host that we succesfully called /make_leave/ on first for
+ # Try the host that we successfully called /make_leave/ on first for
# the /send_leave/ request.
host_list = list(target_hosts)
try:
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 9684e60f..abd8d2af 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -17,7 +17,7 @@
import logging
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
-from synapse.types import get_domain_from_id
+from synapse.types import GroupID, get_domain_from_id
logger = logging.getLogger(__name__)
@@ -28,6 +28,9 @@ def _create_rerouter(func_name):
"""
async def f(self, group_id, *args, **kwargs):
+ if not GroupID.is_valid(group_id):
+ raise SynapseError(400, "%s was not legal group ID" % (group_id,))
+
if self.is_mine_id(group_id):
return await getattr(self.groups_server_handler, func_name)(
group_id, *args, **kwargs
@@ -346,7 +349,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
server_name=get_domain_from_id(group_id),
)
- # TODO: Check that the group is public and we're being added publically
+ # TODO: Check that the group is public and we're being added publicly
is_publicised = content.get("publicise", False)
token = await self.store.register_user_group_membership(
@@ -391,7 +394,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
server_name=get_domain_from_id(group_id),
)
- # TODO: Check that the group is public and we're being added publically
+ # TODO: Check that the group is public and we're being added publicly
is_publicised = content.get("publicise", False)
token = await self.store.register_user_group_membership(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fb0a04e9..c6791fb9 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -656,7 +656,7 @@ class EventCreationHandler:
context: The event context.
Returns:
- The previous verion of the event is returned, if it is found in the
+ The previous version of the event is returned, if it is found in the
event context. Otherwise, None is returned.
"""
prev_state_ids = await context.get_prev_state_ids()
@@ -1099,34 +1099,13 @@ class EventCreationHandler:
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.INVITE:
-
- def is_inviter_member_event(e):
- return e.type == EventTypes.Member and e.sender == event.sender
-
- current_state_ids = await context.get_current_state_ids()
-
- # We know this event is not an outlier, so this must be
- # non-None.
- assert current_state_ids is not None
-
- state_to_include_ids = [
- e_id
- for k, e_id in current_state_ids.items()
- if k[0] in self.room_invite_state_types
- or k == (EventTypes.Member, event.sender)
- ]
-
- state_to_include = await self.store.get_events(state_to_include_ids)
-
- event.unsigned["invite_room_state"] = [
- {
- "type": e.type,
- "state_key": e.state_key,
- "content": e.content,
- "sender": e.sender,
- }
- for e in state_to_include.values()
- ]
+ event.unsigned[
+ "invite_room_state"
+ ] = await self.store.get_stripped_room_state_from_event_context(
+ context,
+ self.room_invite_state_types,
+ membership_user_id=event.sender,
+ )
invitee = UserID.from_string(event.state_key)
if not self.hs.is_mine(invitee):
@@ -1159,6 +1138,9 @@ class EventCreationHandler:
if original_event.room_id != event.room_id:
raise SynapseError(400, "Cannot redact event from a different room")
+ if original_event.type == EventTypes.ServerACL:
+ raise AuthError(403, "Redacting server ACL events is not permitted")
+
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = self.auth.compute_auth_events(
event, prev_state_ids, for_verification=True
diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 05ac86e6..331d4e7e 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc_handler.py
@@ -217,7 +217,7 @@ class OidcHandler:
This is based on the requested scopes: if the scopes include
``openid``, the provider should give use an ID token containing the
- user informations. If not, we should fetch them using the
+ user information. If not, we should fetch them using the
``access_token`` with the ``userinfo_endpoint``.
"""
@@ -426,7 +426,7 @@ class OidcHandler:
return resp
async def _fetch_userinfo(self, token: Token) -> UserInfo:
- """Fetch user informations from the ``userinfo_endpoint``.
+ """Fetch user information from the ``userinfo_endpoint``.
Args:
token: the token given by the ``token_endpoint``.
@@ -695,9 +695,7 @@ class OidcHandler:
return
# Pull out the user-agent and IP from the request.
- user_agent = request.requestHeaders.getRawHeaders(b"User-Agent", default=[b""])[
- 0
- ].decode("ascii", "surrogateescape")
+ user_agent = request.get_user_agent("")
ip_address = self.hs.get_ip_from_request(request)
# Call the mapper to register/login the user
@@ -756,7 +754,7 @@ class OidcHandler:
Defaults to an hour.
Returns:
- A signed macaroon token with the session informations.
+ A signed macaroon token with the session information.
"""
macaroon = pymacaroons.Macaroon(
location=self._server_name, identifier="key", key=self._macaroon_secret_key,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1000ac95..8e014c9b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -48,7 +48,7 @@ from synapse.util.wheel_timer import WheelTimer
MYPY = False
if MYPY:
- import synapse.server
+ from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -101,7 +101,7 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class BasePresenceHandler(abc.ABC):
"""Parts of the PresenceHandler that are shared between workers and master"""
- def __init__(self, hs: "synapse.server.HomeServer"):
+ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
@@ -199,7 +199,7 @@ class BasePresenceHandler(abc.ABC):
class PresenceHandler(BasePresenceHandler):
- def __init__(self, hs: "synapse.server.HomeServer"):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
@@ -802,7 +802,7 @@ class PresenceHandler(BasePresenceHandler):
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
- function to get further updatees.
+ function to get further updates.
The updates are a list of 2-tuples of stream ID and the row data
"""
@@ -977,7 +977,7 @@ def should_notify(old_state, new_state):
new_state.last_active_ts - old_state.last_active_ts
> LAST_ACTIVE_GRANULARITY
):
- # Only notify about last active bumps if we're not currently acive
+ # Only notify about last active bumps if we're not currently active
if not new_state.currently_active:
notify_reason_counter.labels("last_active_change_online").inc()
return True
@@ -1011,7 +1011,7 @@ def format_user_presence_state(state, now, include_user_id=True):
class PresenceEventSource:
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
# We can't call get_presence_handler here because there's a cycle:
#
# Presence -> Notifier -> PresenceEventSource -> Presence
@@ -1071,12 +1071,14 @@ class PresenceEventSource:
users_interested_in = await self._get_interested_in(user, explicit_room_id)
- user_ids_changed = set()
+ user_ids_changed = set() # type: Collection[str]
changed = None
if from_key:
changed = stream_change_cache.get_all_entities_changed(from_key)
if changed is not None and len(changed) < 500:
+ assert isinstance(user_ids_changed, set)
+
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
get_updates_counter.labels("stream").inc()
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 92700b58..74a1ddd7 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -98,11 +98,18 @@ class ProfileHandler(BaseHandler):
except RequestSendFailed as e:
raise SynapseError(502, "Failed to fetch profile") from e
except HttpResponseException as e:
+ if e.code < 500 and e.code != 404:
+ # Other codes are not allowed in c2s API
+ logger.info(
+ "Server replied with wrong response: %s %s", e.code, e.msg
+ )
+
+ raise SynapseError(502, "Failed to fetch profile")
raise e.to_synapse_error()
async def get_profile_from_cache(self, user_id: str) -> JsonDict:
"""Get the profile information from our local cache. If the user is
- ours then the profile information will always be corect. Otherwise,
+ ours then the profile information will always be correct. Otherwise,
it may be out of date/missing.
"""
target_user = UserID.from_string(user_id)
@@ -124,7 +131,7 @@ class ProfileHandler(BaseHandler):
profile = await self.store.get_from_remote_profile_cache(user_id)
return profile or {}
- async def get_displayname(self, target_user: UserID) -> str:
+ async def get_displayname(self, target_user: UserID) -> Optional[str]:
if self.hs.is_mine(target_user):
try:
displayname = await self.store.get_profile_displayname(
@@ -182,7 +189,9 @@ class ProfileHandler(BaseHandler):
)
if not isinstance(new_displayname, str):
- raise SynapseError(400, "Invalid displayname")
+ raise SynapseError(
+ 400, "'displayname' must be a string", errcode=Codes.INVALID_PARAM
+ )
if len(new_displayname) > MAX_DISPLAYNAME_LEN:
raise SynapseError(
@@ -211,7 +220,7 @@ class ProfileHandler(BaseHandler):
await self._update_join_states(requester, target_user)
- async def get_avatar_url(self, target_user: UserID) -> str:
+ async def get_avatar_url(self, target_user: UserID) -> Optional[str]:
if self.hs.is_mine(target_user):
try:
avatar_url = await self.store.get_profile_avatar_url(
@@ -266,7 +275,9 @@ class ProfileHandler(BaseHandler):
)
if not isinstance(new_avatar_url, str):
- raise SynapseError(400, "Invalid displayname")
+ raise SynapseError(
+ 400, "'avatar_url' must be a string", errcode=Codes.INVALID_PARAM
+ )
if len(new_avatar_url) > MAX_AVATAR_URL_LEN:
raise SynapseError(
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index a6f1d216..ed1ff625 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -115,7 +115,10 @@ class RegistrationHandler(BaseHandler):
400, "User ID already taken.", errcode=Codes.USER_IN_USE
)
user_data = await self.auth.get_user_by_access_token(guest_access_token)
- if not user_data["is_guest"] or user_data["user"].localpart != localpart:
+ if (
+ not user_data.is_guest
+ or UserID.from_string(user_data.user_id).localpart != localpart
+ ):
raise AuthError(
403,
"Cannot register taken user ID without valid guest "
@@ -741,7 +744,7 @@ class RegistrationHandler(BaseHandler):
# up when the access token is saved, but that's quite an
# invasive change I'd rather do separately.
user_tuple = await self.store.get_user_by_access_token(token)
- token_id = user_tuple["token_id"]
+ token_id = user_tuple.token_id
await self.pusher_pool.add_pusher(
user_id=user_id,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ec300d88..e7303147 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -771,22 +771,29 @@ class RoomCreationHandler(BaseHandler):
ratelimit=False,
)
- for invitee in invite_list:
+ # we avoid dropping the lock between invites, as otherwise joins can
+ # start coming in and making the createRoom slow.
+ #
+ # we also don't need to check the requester's shadow-ban here, as we
+ # have already done so above (and potentially emptied invite_list).
+ with (await self.room_member_handler.member_linearizer.queue((room_id,))):
content = {}
is_direct = config.get("is_direct", None)
if is_direct:
content["is_direct"] = is_direct
- # Note that update_membership with an action of "invite" can raise a
- # ShadowBanError, but this was handled above by emptying invite_list.
- _, last_stream_id = await self.room_member_handler.update_membership(
- requester,
- UserID.from_string(invitee),
- room_id,
- "invite",
- ratelimit=False,
- content=content,
- )
+ for invitee in invite_list:
+ (
+ _,
+ last_stream_id,
+ ) = await self.room_member_handler.update_membership_locked(
+ requester,
+ UserID.from_string(invitee),
+ room_id,
+ "invite",
+ ratelimit=False,
+ content=content,
+ )
for invite_3pid in invite_3pid_list:
id_server = invite_3pid["id_server"]
@@ -1268,7 +1275,7 @@ class RoomShutdownHandler:
)
# We now wait for the create room to come back in via replication so
- # that we can assume that all the joins/invites have propogated before
+ # that we can assume that all the joins/invites have propagated before
# we try and auto join below.
await self._replication.wait_for_stream_position(
self.hs.config.worker.events_shard_config.get_instance(new_room_id),
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ec784030..7cd858b7 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -307,7 +307,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
key = (room_id,)
with (await self.member_linearizer.queue(key)):
- result = await self._update_membership(
+ result = await self.update_membership_locked(
requester,
target,
room_id,
@@ -322,7 +322,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
return result
- async def _update_membership(
+ async def update_membership_locked(
self,
requester: Requester,
target: UserID,
@@ -335,6 +335,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
content: Optional[dict] = None,
require_consent: bool = True,
) -> Tuple[str, int]:
+ """Helper for update_membership.
+
+ Assumes that the membership linearizer is already held for the room.
+ """
content_specified = bool(content)
if content is None:
content = {}
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index 285c481a..fd6c5e9e 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml_handler.py
@@ -216,9 +216,7 @@ class SamlHandler:
return
# Pull out the user-agent and IP from the request.
- user_agent = request.requestHeaders.getRawHeaders(b"User-Agent", default=[b""])[
- 0
- ].decode("ascii", "surrogateescape")
+ user_agent = request.get_user_agent("")
ip_address = self.hs.get_ip_from_request(request)
# Call the mapper to register/login the user
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index e9402e6e..66f1bbcf 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -139,7 +139,7 @@ class SearchHandler(BaseHandler):
# Filter to apply to results
filter_dict = room_cat.get("filter", {})
- # What to order results by (impacts whether pagination can be doen)
+ # What to order results by (impacts whether pagination can be done)
order_by = room_cat.get("order_by", "rank")
# Return the current state of the rooms?
diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py
index 7a4ae072..fb4f70e8 100644
--- a/synapse/handlers/state_deltas.py
+++ b/synapse/handlers/state_deltas.py
@@ -32,7 +32,7 @@ class StateDeltasHandler:
Returns:
None if the field in the events either both match `public_value`
or if neither do, i.e. there has been no change.
- True if it didnt match `public_value` but now does
+ True if it didn't match `public_value` but now does
False if it did match `public_value` but now doesn't
"""
prev_event = None
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b527724b..32e53c2d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -754,7 +754,7 @@ class SyncHandler:
"""
# TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state
- # updates even if they occured logically before the previous event.
+ # updates even if they occurred logically before the previous event.
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
@@ -1882,7 +1882,7 @@ class SyncHandler:
# members (as the client otherwise doesn't have enough info to form
# the name itself).
if sync_config.filter_collection.lazy_load_members() and (
- # we recalulate the summary:
+ # we recalculate the summary:
# if there are membership changes in the timeline, or
# if membership has changed during a gappy sync, or
# if this is an initial sync.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d3692842..e919a8f9 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -167,20 +167,25 @@ class FollowerTypingHandler:
now_typing = set(row.user_ids)
self._room_typing[row.room_id] = row.user_ids
- run_as_background_process(
- "_handle_change_in_typing",
- self._handle_change_in_typing,
- row.room_id,
- prev_typing,
- now_typing,
- )
+ if self.federation:
+ run_as_background_process(
+ "_send_changes_in_typing_to_remotes",
+ self._send_changes_in_typing_to_remotes,
+ row.room_id,
+ prev_typing,
+ now_typing,
+ )
- async def _handle_change_in_typing(
+ async def _send_changes_in_typing_to_remotes(
self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
):
"""Process a change in typing of a room from replication, sending EDUs
for any local users.
"""
+
+ if not self.federation:
+ return
+
for user_id in now_typing - prev_typing:
if self.is_mine_id(user_id):
await self._push_remote(RoomMember(room_id, user_id), True)
@@ -371,7 +376,7 @@ class TypingWriterHandler(FollowerTypingHandler):
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
- function to get further updatees.
+ function to get further updates.
The updates are a list of 2-tuples of stream ID and the row data
"""
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 79393c88..afbebfc2 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -31,7 +31,7 @@ class UserDirectoryHandler(StateDeltasHandler):
N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
The user directory is filled with users who this server can see are joined to a
- world_readable or publically joinable room. We keep a database table up to date
+ world_readable or publicly joinable room. We keep a database table up to date
by streaming changes of the current state and recalculating whether users should
be in the directory or not when necessary.
"""
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 8324632c..f4093688 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -359,7 +359,7 @@ class SimpleHttpClient:
agent=self.agent,
data=body_producer,
headers=headers,
- **self._extra_treq_args
+ **self._extra_treq_args,
) # type: defer.Deferred
# we use our own timeout mechanism rather than treq's as a workaround
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index a306faa2..1cc666fb 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -172,7 +172,7 @@ class WellKnownResolver:
had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)
# We do this in two steps to differentiate between possibly transient
- # errors (e.g. can't connect to host, 503 response) and more permenant
+ # errors (e.g. can't connect to host, 503 response) and more permanent
# errors (such as getting a 404 response).
response, body = await self._make_well_known_request(
server_name, retry=had_valid_well_known
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c23a4d7c..7e17cdb7 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -587,7 +587,7 @@ class MatrixFederationHttpClient:
"""
Builds the Authorization headers for a federation request
Args:
- destination (bytes|None): The desination homeserver of the request.
+ destination (bytes|None): The destination homeserver of the request.
May be None if the destination is an identity server, in which case
destination_is must be non-None.
method (bytes): The HTTP method of the request
@@ -640,7 +640,7 @@ class MatrixFederationHttpClient:
backoff_on_404=False,
try_trailing_slash_on_400=False,
):
- """ Sends the specifed json data using PUT
+ """ Sends the specified json data using PUT
Args:
destination (str): The remote server to send the HTTP request
@@ -729,7 +729,7 @@ class MatrixFederationHttpClient:
ignore_backoff=False,
args={},
):
- """ Sends the specifed json data using POST
+ """ Sends the specified json data using POST
Args:
destination (str): The remote server to send the HTTP request
@@ -1063,13 +1063,19 @@ def check_content_type_is_json(headers):
"""
c_type = headers.getRawHeaders(b"Content-Type")
if c_type is None:
- raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
+ raise RequestSendFailed(
+ RuntimeError("No Content-Type header received from remote server"),
+ can_retry=False,
+ )
c_type = c_type[0].decode("ascii") # only the first header
val, options = cgi.parse_header(c_type)
if val != "application/json":
raise RequestSendFailed(
- RuntimeError("Content-Type not application/json: was '%s'" % c_type),
+ RuntimeError(
+ "Remote server sent Content-Type header of '%s', not 'application/json'"
+ % c_type,
+ ),
can_retry=False,
)
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index cd94e789..7c5defec 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -109,7 +109,7 @@ in_flight_requests_db_sched_duration = Counter(
# The set of all in flight requests, set[RequestMetrics]
_in_flight_requests = set()
-# Protects the _in_flight_requests set from concurrent accesss
+# Protects the _in_flight_requests set from concurrent access
_in_flight_requests_lock = threading.Lock()
diff --git a/synapse/http/server.py b/synapse/http/server.py
index d8e354f0..c0919f8c 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -180,7 +180,7 @@ class HttpServer:
""" Register a callback that gets fired if we receive a http request
with the given method for a path that matches the given regex.
- If the regex contains groups these gets passed to the calback via
+ If the regex contains groups these gets passed to the callback via
an unpacked tuple.
Args:
@@ -239,7 +239,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
async def _async_render(self, request: Request):
"""Delegates to `_async_render_<METHOD>` methods, or returns a 400 if
- no appropriate method exists. Can be overriden in sub classes for
+ no appropriate method exists. Can be overridden in sub classes for
different routing.
"""
# Treat HEAD requests as GET requests.
@@ -384,7 +384,7 @@ class JsonResource(DirectServeJsonResource):
async def _async_render(self, request):
callback, servlet_classname, group_dict = self._get_handler_for_request(request)
- # Make sure we have an appopriate name for this handler in prometheus
+ # Make sure we have an appropriate name for this handler in prometheus
# (rather than the default of JsonResource).
request.request_metrics.name = servlet_classname
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index fd90ba78..b361b7cb 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -272,7 +272,6 @@ class RestServlet:
on_PUT
on_POST
on_DELETE
- on_OPTIONS
Automatically handles turning CodeMessageExceptions thrown by these methods
into the appropriate HTTP response.
@@ -283,7 +282,7 @@ class RestServlet:
if hasattr(self, "PATTERNS"):
patterns = self.PATTERNS
- for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"):
+ for method in ("GET", "PUT", "POST", "DELETE"):
if hasattr(self, "on_%s" % (method,)):
servlet_classname = self.__class__.__name__
method_handler = getattr(self, "on_%s" % (method,))
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 6e79b478..5f0581dc 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,7 +14,7 @@
import contextlib
import logging
import time
-from typing import Optional
+from typing import Optional, Union
from twisted.python.failure import Failure
from twisted.web.server import Request, Site
@@ -23,6 +23,7 @@ from synapse.config.server import ListenerConfig
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics, requests_counter
from synapse.logging.context import LoggingContext, PreserveLoggingContext
+from synapse.types import Requester
logger = logging.getLogger(__name__)
@@ -54,9 +55,12 @@ class SynapseRequest(Request):
Request.__init__(self, channel, *args, **kw)
self.site = channel.site
self._channel = channel # this is used by the tests
- self.authenticated_entity = None
self.start_time = 0.0
+ # The requester, if authenticated. For federation requests this is the
+ # server name, for client requests this is the Requester object.
+ self.requester = None # type: Optional[Union[Requester, str]]
+
# we can't yet create the logcontext, as we don't know the method.
self.logcontext = None # type: Optional[LoggingContext]
@@ -109,8 +113,14 @@ class SynapseRequest(Request):
method = self.method.decode("ascii")
return method
- def get_user_agent(self):
- return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
+ def get_user_agent(self, default: str) -> str:
+ """Return the last User-Agent header, or the given default.
+ """
+ user_agent = self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
+ if user_agent is None:
+ return default
+
+ return user_agent.decode("ascii", "replace")
def render(self, resrc):
# this is called once a Resource has been found to serve the request; in our
@@ -161,7 +171,9 @@ class SynapseRequest(Request):
yield
except Exception:
# this should already have been caught, and sent back to the client as a 500.
- logger.exception("Asynchronous messge handler raised an uncaught exception")
+ logger.exception(
+ "Asynchronous message handler raised an uncaught exception"
+ )
finally:
# the request handler has finished its work and either sent the whole response
# back, or handed over responsibility to a Producer.
@@ -263,22 +275,30 @@ class SynapseRequest(Request):
# to the client (nb may be negative)
response_send_time = self.finish_time - self._processing_finished_time
- # need to decode as it could be raw utf-8 bytes
- # from a IDN servname in an auth header
- authenticated_entity = self.authenticated_entity
- if authenticated_entity is not None and isinstance(authenticated_entity, bytes):
- authenticated_entity = authenticated_entity.decode("utf-8", "replace")
+ # Convert the requester into a string that we can log
+ authenticated_entity = None
+ if isinstance(self.requester, str):
+ authenticated_entity = self.requester
+ elif isinstance(self.requester, Requester):
+ authenticated_entity = self.requester.authenticated_entity
+
+ # If this is a request where the target user doesn't match the user who
+ # authenticated (e.g. and admin is puppetting a user) then we log both.
+ if self.requester.user.to_string() != authenticated_entity:
+ authenticated_entity = "{},{}".format(
+ authenticated_entity, self.requester.user.to_string(),
+ )
+ elif self.requester is not None:
+ # This shouldn't happen, but we log it so we don't lose information
+ # and can see that we're doing something wrong.
+ authenticated_entity = repr(self.requester) # type: ignore[unreachable]
# ...or could be raw utf-8 bytes in the User-Agent header.
# N.B. if you don't do this, the logger explodes cryptically
# with maximum recursion trying to log errors about
# the charset problem.
# c.f. https://github.com/matrix-org/synapse/issues/3471
- user_agent = self.get_user_agent()
- if user_agent is not None:
- user_agent = user_agent.decode("utf-8", "replace")
- else:
- user_agent = "-"
+ user_agent = self.get_user_agent("-")
code = str(self.code)
if not self.finished:
diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py
index e69de29b..b28b7b2e 100644
--- a/synapse/logging/__init__.py
+++ b/synapse/logging/__init__.py
@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# These are imported to allow for nicer logging configuration files.
+from synapse.logging._remote import RemoteHandler
+from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
+
+__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 0caf3259..fb937b3f 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import sys
import traceback
from collections import deque
@@ -21,10 +22,11 @@ from math import floor
from typing import Callable, Optional
import attr
+from typing_extensions import Deque
from zope.interface import implementer
from twisted.application.internet import ClientService
-from twisted.internet.defer import Deferred
+from twisted.internet.defer import CancelledError, Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
@@ -32,7 +34,9 @@ from twisted.internet.endpoints import (
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
-from twisted.logger import ILogObserver, Logger, LogLevel
+from twisted.python.failure import Failure
+
+logger = logging.getLogger(__name__)
@attr.s
@@ -45,11 +49,11 @@ class LogProducer:
Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
- format_event: A callable to format the log entry to a string.
+ format: A callable to format the log record to a string.
"""
transport = attr.ib(type=ITransport)
- format_event = attr.ib(type=Callable[[dict], str])
+ _format = attr.ib(type=Callable[[logging.LogRecord], str])
_buffer = attr.ib(type=deque)
_paused = attr.ib(default=False, type=bool, init=False)
@@ -61,16 +65,19 @@ class LogProducer:
self._buffer = deque()
def resumeProducing(self):
+ # If we're already producing, nothing to do.
self._paused = False
+ # Loop until paused.
while self._paused is False and (self._buffer and self.transport.connected):
try:
- # Request the next event and format it.
- event = self._buffer.popleft()
- msg = self.format_event(event)
+ # Request the next record and format it.
+ record = self._buffer.popleft()
+ msg = self._format(record)
# Send it as a new line over the transport.
self.transport.write(msg.encode("utf8"))
+ self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
@@ -78,76 +85,85 @@ class LogProducer:
break
-@attr.s
-@implementer(ILogObserver)
-class TCPLogObserver:
+class RemoteHandler(logging.Handler):
"""
- An IObserver that writes JSON logs to a TCP target.
+ An logging handler that writes logs to a TCP target.
Args:
- hs (HomeServer): The homeserver that is being logged for.
host: The host of the logging target.
port: The logging target's port.
- format_event: A callable to format the log entry to a string.
maximum_buffer: The maximum buffer size.
"""
- hs = attr.ib()
- host = attr.ib(type=str)
- port = attr.ib(type=int)
- format_event = attr.ib(type=Callable[[dict], str])
- maximum_buffer = attr.ib(type=int)
- _buffer = attr.ib(default=attr.Factory(deque), type=deque)
- _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
- _logger = attr.ib(default=attr.Factory(Logger))
- _producer = attr.ib(default=None, type=Optional[LogProducer])
-
- def start(self) -> None:
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ maximum_buffer: int = 1000,
+ level=logging.NOTSET,
+ _reactor=None,
+ ):
+ super().__init__(level=level)
+ self.host = host
+ self.port = port
+ self.maximum_buffer = maximum_buffer
+
+ self._buffer = deque() # type: Deque[logging.LogRecord]
+ self._connection_waiter = None # type: Optional[Deferred]
+ self._producer = None # type: Optional[LogProducer]
# Connect without DNS lookups if it's a direct IP.
+ if _reactor is None:
+ from twisted.internet import reactor
+
+ _reactor = reactor
+
try:
ip = ip_address(self.host)
if isinstance(ip, IPv4Address):
- endpoint = TCP4ClientEndpoint(
- self.hs.get_reactor(), self.host, self.port
- )
+ endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
elif isinstance(ip, IPv6Address):
- endpoint = TCP6ClientEndpoint(
- self.hs.get_reactor(), self.host, self.port
- )
+ endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
else:
raise ValueError("Unknown IP address provided: %s" % (self.host,))
except ValueError:
- endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
+ endpoint = HostnameEndpoint(_reactor, self.host, self.port)
factory = Factory.forProtocol(Protocol)
- self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
+ self._service = ClientService(endpoint, factory, clock=_reactor)
self._service.startService()
+ self._stopping = False
self._connect()
- def stop(self):
+ def close(self):
+ self._stopping = True
self._service.stopService()
def _connect(self) -> None:
"""
Triggers an attempt to connect then write to the remote if not already writing.
"""
+ # Do not attempt to open multiple connections.
if self._connection_waiter:
return
self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
- @self._connection_waiter.addErrback
- def fail(r):
- r.printTraceback(file=sys.__stderr__)
+ def fail(failure: Failure) -> None:
+ # If the Deferred was cancelled (e.g. during shutdown) do not try to
+ # reconnect (this will cause an infinite loop of errors).
+ if failure.check(CancelledError) and self._stopping:
+ return
+
+ # For a different error, print the traceback and re-connect.
+ failure.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()
- @self._connection_waiter.addCallback
- def writer(r):
+ def writer(result: Protocol) -> None:
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
- if self._producer and r.transport is self._producer.transport:
+ if self._producer and result.transport is self._producer.transport:
self._producer.resumeProducing()
self._connection_waiter = None
return
@@ -158,29 +174,29 @@ class TCPLogObserver:
# Make a new producer and start it.
self._producer = LogProducer(
- buffer=self._buffer,
- transport=r.transport,
- format_event=self.format_event,
+ buffer=self._buffer, transport=result.transport, format=self.format,
)
- r.transport.registerProducer(self._producer, True)
+ result.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None
+ self._connection_waiter.addCallbacks(writer, fail)
+
def _handle_pressure(self) -> None:
"""
- Handle backpressure by shedding events.
+ Handle backpressure by shedding records.
The buffer will, in this order, until the buffer is below the maximum:
- - Shed DEBUG events
- - Shed INFO events
- - Shed the middle 50% of the events.
+ - Shed DEBUG records.
+ - Shed INFO records.
+ - Shed the middle 50% of the records.
"""
if len(self._buffer) <= self.maximum_buffer:
return
# Strip out DEBUGs
self._buffer = deque(
- filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer)
+ filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
)
if len(self._buffer) <= self.maximum_buffer:
@@ -188,7 +204,7 @@ class TCPLogObserver:
# Strip out INFOs
self._buffer = deque(
- filter(lambda event: event["log_level"] != LogLevel.info, self._buffer)
+ filter(lambda record: record.levelno > logging.INFO, self._buffer)
)
if len(self._buffer) <= self.maximum_buffer:
@@ -209,17 +225,17 @@ class TCPLogObserver:
self._buffer.extend(reversed(end_buffer))
- def __call__(self, event: dict) -> None:
- self._buffer.append(event)
+ def emit(self, record: logging.LogRecord) -> None:
+ self._buffer.append(record)
# Handle backpressure, if it exists.
try:
self._handle_pressure()
except Exception:
- # If handling backpressure fails,clear the buffer and log the
+ # If handling backpressure fails, clear the buffer and log the
# exception.
self._buffer.clear()
- self._logger.failure("Failed clearing backpressure")
+ logger.warning("Failed clearing backpressure")
# Try and write immediately.
self._connect()
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
index 0fc2ea60..14d9c104 100644
--- a/synapse/logging/_structured.py
+++ b/synapse/logging/_structured.py
@@ -12,138 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
import os.path
-import sys
-import typing
-import warnings
-from typing import List
+from typing import Any, Dict, Generator, Optional, Tuple
-import attr
-from constantly import NamedConstant, Names, ValueConstant, Values
-from zope.interface import implementer
-
-from twisted.logger import (
- FileLogObserver,
- FilteringLogObserver,
- ILogObserver,
- LogBeginner,
- Logger,
- LogLevel,
- LogLevelFilterPredicate,
- LogPublisher,
- eventAsText,
- jsonFileLogObserver,
-)
+from constantly import NamedConstant, Names
from synapse.config._base import ConfigError
-from synapse.logging._terse_json import (
- TerseJSONToConsoleLogObserver,
- TerseJSONToTCPLogObserver,
-)
-from synapse.logging.context import current_context
-
-
-def stdlib_log_level_to_twisted(level: str) -> LogLevel:
- """
- Convert a stdlib log level to Twisted's log level.
- """
- lvl = level.lower().replace("warning", "warn")
- return LogLevel.levelWithName(lvl)
-
-
-@attr.s
-@implementer(ILogObserver)
-class LogContextObserver:
- """
- An ILogObserver which adds Synapse-specific log context information.
-
- Attributes:
- observer (ILogObserver): The target parent observer.
- """
-
- observer = attr.ib()
-
- def __call__(self, event: dict) -> None:
- """
- Consume a log event and emit it to the parent observer after filtering
- and adding log context information.
-
- Args:
- event (dict)
- """
- # Filter out some useless events that Twisted outputs
- if "log_text" in event:
- if event["log_text"].startswith("DNSDatagramProtocol starting on "):
- return
-
- if event["log_text"].startswith("(UDP Port "):
- return
-
- if event["log_text"].startswith("Timing out client") or event[
- "log_format"
- ].startswith("Timing out client"):
- return
-
- context = current_context()
-
- # Copy the context information to the log event.
- context.copy_to_twisted_log_entry(event)
-
- self.observer(event)
-
-
-class PythonStdlibToTwistedLogger(logging.Handler):
- """
- Transform a Python stdlib log message into a Twisted one.
- """
-
- def __init__(self, observer, *args, **kwargs):
- """
- Args:
- observer (ILogObserver): A Twisted logging observer.
- *args, **kwargs: Args/kwargs to be passed to logging.Handler.
- """
- self.observer = observer
- super().__init__(*args, **kwargs)
-
- def emit(self, record: logging.LogRecord) -> None:
- """
- Emit a record to Twisted's observer.
-
- Args:
- record (logging.LogRecord)
- """
-
- self.observer(
- {
- "log_time": record.created,
- "log_text": record.getMessage(),
- "log_format": "{log_text}",
- "log_namespace": record.name,
- "log_level": stdlib_log_level_to_twisted(record.levelname),
- }
- )
-
-
-def SynapseFileLogObserver(outFile: typing.IO[str]) -> FileLogObserver:
- """
- A log observer that formats events like the traditional log formatter and
- sends them to `outFile`.
-
- Args:
- outFile (file object): The file object to write to.
- """
-
- def formatEvent(_event: dict) -> str:
- event = dict(_event)
- event["log_level"] = event["log_level"].name.upper()
- event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
- event.get("log_format", "{log_text}") or "{log_text}"
- )
- return eventAsText(event, includeSystem=False) + "\n"
-
- return FileLogObserver(outFile, formatEvent)
class DrainType(Names):
@@ -155,30 +29,12 @@ class DrainType(Names):
NETWORK_JSON_TERSE = NamedConstant()
-class OutputPipeType(Values):
- stdout = ValueConstant(sys.__stdout__)
- stderr = ValueConstant(sys.__stderr__)
-
-
-@attr.s
-class DrainConfiguration:
- name = attr.ib()
- type = attr.ib()
- location = attr.ib()
- options = attr.ib(default=None)
-
-
-@attr.s
-class NetworkJSONTerseOptions:
- maximum_buffer = attr.ib(type=int)
-
-
-DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
+DEFAULT_LOGGERS = {"synapse": {"level": "info"}}
def parse_drain_configs(
drains: dict,
-) -> typing.Generator[DrainConfiguration, None, None]:
+) -> Generator[Tuple[str, Dict[str, Any]], None, None]:
"""
Parse the drain configurations.
@@ -186,11 +42,12 @@ def parse_drain_configs(
drains (dict): A list of drain configurations.
Yields:
- DrainConfiguration instances.
+ dict instances representing a logging handler.
Raises:
ConfigError: If any of the drain configuration items are invalid.
"""
+
for name, config in drains.items():
if "type" not in config:
raise ConfigError("Logging drains require a 'type' key.")
@@ -202,6 +59,18 @@ def parse_drain_configs(
"%s is not a known logging drain type." % (config["type"],)
)
+ # Either use the default formatter or the tersejson one.
+ if logging_type in (DrainType.CONSOLE_JSON, DrainType.FILE_JSON,):
+ formatter = "json" # type: Optional[str]
+ elif logging_type in (
+ DrainType.CONSOLE_JSON_TERSE,
+ DrainType.NETWORK_JSON_TERSE,
+ ):
+ formatter = "tersejson"
+ else:
+ # A formatter of None implies using the default formatter.
+ formatter = None
+
if logging_type in [
DrainType.CONSOLE,
DrainType.CONSOLE_JSON,
@@ -217,9 +86,11 @@ def parse_drain_configs(
% (logging_type,)
)
- pipe = OutputPipeType.lookupByName(location).value
-
- yield DrainConfiguration(name=name, type=logging_type, location=pipe)
+ yield name, {
+ "class": "logging.StreamHandler",
+ "formatter": formatter,
+ "stream": "ext://sys." + location,
+ }
elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
if "location" not in config:
@@ -233,18 +104,25 @@ def parse_drain_configs(
"File paths need to be absolute, '%s' is a relative path"
% (location,)
)
- yield DrainConfiguration(name=name, type=logging_type, location=location)
+
+ yield name, {
+ "class": "logging.FileHandler",
+ "formatter": formatter,
+ "filename": location,
+ }
elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
host = config.get("host")
port = config.get("port")
maximum_buffer = config.get("maximum_buffer", 1000)
- yield DrainConfiguration(
- name=name,
- type=logging_type,
- location=(host, port),
- options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
- )
+
+ yield name, {
+ "class": "synapse.logging.RemoteHandler",
+ "formatter": formatter,
+ "host": host,
+ "port": port,
+ "maximum_buffer": maximum_buffer,
+ }
else:
raise ConfigError(
@@ -253,126 +131,29 @@ def parse_drain_configs(
)
-class StoppableLogPublisher(LogPublisher):
+def setup_structured_logging(log_config: dict,) -> dict:
"""
- A log publisher that can tell its observers to shut down any external
- communications.
- """
-
- def stop(self):
- for obs in self._observers:
- if hasattr(obs, "stop"):
- obs.stop()
-
-
-def setup_structured_logging(
- hs,
- config,
- log_config: dict,
- logBeginner: LogBeginner,
- redirect_stdlib_logging: bool = True,
-) -> LogPublisher:
- """
- Set up Twisted's structured logging system.
-
- Args:
- hs: The homeserver to use.
- config (HomeserverConfig): The configuration of the Synapse homeserver.
- log_config (dict): The log configuration to use.
+ Convert a legacy structured logging configuration (from Synapse < v1.23.0)
+ to one compatible with the new standard library handlers.
"""
- if config.no_redirect_stdio:
- raise ConfigError(
- "no_redirect_stdio cannot be defined using structured logging."
- )
-
- logger = Logger()
-
if "drains" not in log_config:
raise ConfigError("The logging configuration requires a list of drains.")
- observers = [] # type: List[ILogObserver]
-
- for observer in parse_drain_configs(log_config["drains"]):
- # Pipe drains
- if observer.type == DrainType.CONSOLE:
- logger.debug(
- "Starting up the {name} console logger drain", name=observer.name
- )
- observers.append(SynapseFileLogObserver(observer.location))
- elif observer.type == DrainType.CONSOLE_JSON:
- logger.debug(
- "Starting up the {name} JSON console logger drain", name=observer.name
- )
- observers.append(jsonFileLogObserver(observer.location))
- elif observer.type == DrainType.CONSOLE_JSON_TERSE:
- logger.debug(
- "Starting up the {name} terse JSON console logger drain",
- name=observer.name,
- )
- observers.append(
- TerseJSONToConsoleLogObserver(observer.location, metadata={})
- )
-
- # File drains
- elif observer.type == DrainType.FILE:
- logger.debug("Starting up the {name} file logger drain", name=observer.name)
- log_file = open(observer.location, "at", buffering=1, encoding="utf8")
- observers.append(SynapseFileLogObserver(log_file))
- elif observer.type == DrainType.FILE_JSON:
- logger.debug(
- "Starting up the {name} JSON file logger drain", name=observer.name
- )
- log_file = open(observer.location, "at", buffering=1, encoding="utf8")
- observers.append(jsonFileLogObserver(log_file))
-
- elif observer.type == DrainType.NETWORK_JSON_TERSE:
- metadata = {"server_name": hs.config.server_name}
- log_observer = TerseJSONToTCPLogObserver(
- hs=hs,
- host=observer.location[0],
- port=observer.location[1],
- metadata=metadata,
- maximum_buffer=observer.options.maximum_buffer,
- )
- log_observer.start()
- observers.append(log_observer)
- else:
- # We should never get here, but, just in case, throw an error.
- raise ConfigError("%s drain type cannot be configured" % (observer.type,))
-
- publisher = StoppableLogPublisher(*observers)
- log_filter = LogLevelFilterPredicate()
-
- for namespace, namespace_config in log_config.get(
- "loggers", DEFAULT_LOGGERS
- ).items():
- # Set the log level for twisted.logger.Logger namespaces
- log_filter.setLogLevelForNamespace(
- namespace,
- stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
- )
-
- # Also set the log levels for the stdlib logger namespaces, to prevent
- # them getting to PythonStdlibToTwistedLogger and having to be formatted
- if "level" in namespace_config:
- logging.getLogger(namespace).setLevel(namespace_config.get("level"))
-
- f = FilteringLogObserver(publisher, [log_filter])
- lco = LogContextObserver(f)
-
- if redirect_stdlib_logging:
- stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
- stdliblogger = logging.getLogger()
- stdliblogger.addHandler(stuff_into_twisted)
-
- # Always redirect standard I/O, otherwise other logging outputs might miss
- # it.
- logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
+ new_config = {
+ "version": 1,
+ "formatters": {
+ "json": {"class": "synapse.logging.JsonFormatter"},
+ "tersejson": {"class": "synapse.logging.TerseJsonFormatter"},
+ },
+ "handlers": {},
+ "loggers": log_config.get("loggers", DEFAULT_LOGGERS),
+ "root": {"handlers": []},
+ }
- return publisher
+ for handler_name, handler in parse_drain_configs(log_config["drains"]):
+ new_config["handlers"][handler_name] = handler
+ # Add each handler to the root logger.
+ new_config["root"]["handlers"].append(handler_name)
-def reload_structured_logging(*args, log_config=None) -> None:
- warnings.warn(
- "Currently the structured logging system can not be reloaded, doing nothing"
- )
+ return new_config
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 9b46956c..2fbf5549 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -16,141 +16,65 @@
"""
Log formatters that output terse JSON.
"""
-
import json
-from typing import IO
-
-from twisted.logger import FileLogObserver
-
-from synapse.logging._remote import TCPLogObserver
+import logging
_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
-
-def flatten_event(event: dict, metadata: dict, include_time: bool = False):
- """
- Flatten a Twisted logging event to an dictionary capable of being sent
- as a log event to a logging aggregation system.
-
- The format is vastly simplified and is not designed to be a "human readable
- string" in the sense that traditional logs are. Instead, the structure is
- optimised for searchability and filtering, with human-understandable log
- keys.
-
- Args:
- event (dict): The Twisted logging event we are flattening.
- metadata (dict): Additional data to include with each log message. This
- can be information like the server name. Since the target log
- consumer does not know who we are other than by host IP, this
- allows us to forward through static information.
- include_time (bool): Should we include the `time` key? If False, the
- event time is stripped from the event.
- """
- new_event = {}
-
- # If it's a failure, make the new event's log_failure be the traceback text.
- if "log_failure" in event:
- new_event["log_failure"] = event["log_failure"].getTraceback()
-
- # If it's a warning, copy over a string representation of the warning.
- if "warning" in event:
- new_event["warning"] = str(event["warning"])
-
- # Stdlib logging events have "log_text" as their human-readable portion,
- # Twisted ones have "log_format". For now, include the log_format, so that
- # context only given in the log format (e.g. what is being logged) is
- # available.
- if "log_text" in event:
- new_event["log"] = event["log_text"]
- else:
- new_event["log"] = event["log_format"]
-
- # We want to include the timestamp when forwarding over the network, but
- # exclude it when we are writing to stdout. This is because the log ingester
- # (e.g. logstash, fluentd) can add its own timestamp.
- if include_time:
- new_event["time"] = round(event["log_time"], 2)
-
- # Convert the log level to a textual representation.
- new_event["level"] = event["log_level"].name.upper()
-
- # Ignore these keys, and do not transfer them over to the new log object.
- # They are either useless (isError), transferred manually above (log_time,
- # log_level, etc), or contain Python objects which are not useful for output
- # (log_logger, log_source).
- keys_to_delete = [
- "isError",
- "log_failure",
- "log_format",
- "log_level",
- "log_logger",
- "log_source",
- "log_system",
- "log_time",
- "log_text",
- "observer",
- "warning",
- ]
-
- # If it's from the Twisted legacy logger (twisted.python.log), it adds some
- # more keys we want to purge.
- if event.get("log_namespace") == "log_legacy":
- keys_to_delete.extend(["message", "system", "time"])
-
- # Rather than modify the dictionary in place, construct a new one with only
- # the content we want. The original event should be considered 'frozen'.
- for key in event.keys():
-
- if key in keys_to_delete:
- continue
-
- if isinstance(event[key], (str, int, bool, float)) or event[key] is None:
- # If it's a plain type, include it as is.
- new_event[key] = event[key]
- else:
- # If it's not one of those basic types, write out a string
- # representation. This should probably be a warning in development,
- # so that we are sure we are only outputting useful data.
- new_event[key] = str(event[key])
-
- # Add the metadata information to the event (e.g. the server_name).
- new_event.update(metadata)
-
- return new_event
-
-
-def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogObserver:
- """
- A log observer that formats events to a flattened JSON representation.
-
- Args:
- outFile: The file object to write to.
- metadata: Metadata to be added to each log object.
- """
-
- def formatEvent(_event: dict) -> str:
- flattened = flatten_event(_event, metadata)
- return _encoder.encode(flattened) + "\n"
-
- return FileLogObserver(outFile, formatEvent)
-
-
-def TerseJSONToTCPLogObserver(
- hs, host: str, port: int, metadata: dict, maximum_buffer: int
-) -> FileLogObserver:
- """
- A log observer that formats events to a flattened JSON representation.
-
- Args:
- hs (HomeServer): The homeserver that is being logged for.
- host: The host of the logging target.
- port: The logging target's port.
- metadata: Metadata to be added to each log object.
- maximum_buffer: The maximum buffer size.
- """
-
- def formatEvent(_event: dict) -> str:
- flattened = flatten_event(_event, metadata, include_time=True)
- return _encoder.encode(flattened) + "\n"
-
- return TCPLogObserver(hs, host, port, formatEvent, maximum_buffer)
+# The properties of a standard LogRecord.
+_LOG_RECORD_ATTRIBUTES = {
+ "args",
+ "asctime",
+ "created",
+ "exc_info",
+ # exc_text isn't a public attribute, but is used to cache the result of formatException.
+ "exc_text",
+ "filename",
+ "funcName",
+ "levelname",
+ "levelno",
+ "lineno",
+ "message",
+ "module",
+ "msecs",
+ "msg",
+ "name",
+ "pathname",
+ "process",
+ "processName",
+ "relativeCreated",
+ "stack_info",
+ "thread",
+ "threadName",
+}
+
+
+class JsonFormatter(logging.Formatter):
+ def format(self, record: logging.LogRecord) -> str:
+ event = {
+ "log": record.getMessage(),
+ "namespace": record.name,
+ "level": record.levelname,
+ }
+
+ return self._format(record, event)
+
+ def _format(self, record: logging.LogRecord, event: dict) -> str:
+ # Add any extra attributes to the event.
+ for key, value in record.__dict__.items():
+ if key not in _LOG_RECORD_ATTRIBUTES:
+ event[key] = value
+
+ return _encoder.encode(event)
+
+
+class TerseJsonFormatter(JsonFormatter):
+ def format(self, record: logging.LogRecord) -> str:
+ event = {
+ "log": record.getMessage(),
+ "namespace": record.name,
+ "level": record.levelname,
+ "time": round(record.created, 2),
+ }
+
+ return self._format(record, event)
diff --git a/synapse/logging/filter.py b/synapse/logging/filter.py
new file mode 100644
index 00000000..1baf8dd6
--- /dev/null
+++ b/synapse/logging/filter.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from typing_extensions import Literal
+
+
+class MetadataFilter(logging.Filter):
+ """Logging filter that adds constant values to each record.
+
+ Args:
+ metadata: Key-value pairs to add to each record.
+ """
+
+ def __init__(self, metadata: dict):
+ self._metadata = metadata
+
+ def filter(self, record: logging.LogRecord) -> Literal[True]:
+ for key, value in self._metadata.items():
+ setattr(record, key, value)
+ return True
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index e58850fa..ab586c31 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -317,7 +317,7 @@ def ensure_active_span(message, ret=None):
@contextlib.contextmanager
-def _noop_context_manager(*args, **kwargs):
+def noop_context_manager(*args, **kwargs):
"""Does exactly what it says on the tin"""
yield
@@ -413,7 +413,7 @@ def start_active_span(
"""
if opentracing is None:
- return _noop_context_manager()
+ return noop_context_manager()
return opentracing.tracer.start_active_span(
operation_name,
@@ -428,7 +428,7 @@ def start_active_span(
def start_active_span_follows_from(operation_name, contexts):
if opentracing is None:
- return _noop_context_manager()
+ return noop_context_manager()
references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)
@@ -459,7 +459,7 @@ def start_active_span_from_request(
# Also, twisted uses byte arrays while opentracing expects strings.
if opentracing is None:
- return _noop_context_manager()
+ return noop_context_manager()
header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
@@ -497,7 +497,7 @@ def start_active_span_from_edu(
"""
if opentracing is None:
- return _noop_context_manager()
+ return noop_context_manager()
carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
"opentracing", {}
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index b8d2a8e8..cbf0dbb8 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -502,6 +502,16 @@ build_info.labels(
last_ticked = time.time()
+# 3PID send info
+threepid_send_requests = Histogram(
+ "synapse_threepid_send_requests_with_tries",
+ documentation="Number of requests for a 3pid token by try count. Note if"
+ " there is a request with try count of 4, then there would have been one"
+ " each for 1, 2 and 3",
+ buckets=(1, 2, 3, 4, 5, 10),
+ labelnames=("type", "reason"),
+)
+
class ReactorLastSeenMetric:
def collect(self):
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index ea5f1c7b..658f6ecd 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -24,7 +24,7 @@ from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
from synapse.logging.context import LoggingContext, PreserveLoggingContext
-from synapse.logging.opentracing import start_active_span
+from synapse.logging.opentracing import noop_context_manager, start_active_span
if TYPE_CHECKING:
import resource
@@ -167,7 +167,7 @@ class _BackgroundProcess:
)
-def run_as_background_process(desc: str, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
@@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
+ bg_start_span: Whether to start an opentracing span. Defaults to True.
+ Should only be disabled for processes that will not log to or tag
+ a span.
args: positional args for func
kwargs: keyword args for func
@@ -199,7 +202,10 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
with BackgroundProcessLoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
try:
- with start_active_span(desc, tags={"request_id": context.request}):
+ ctx = noop_context_manager()
+ if bg_start_span:
+ ctx = start_active_span(desc, tags={"request_id": context.request})
+ with ctx:
result = func(*args, **kwargs)
if inspect.isawaitable(result):
@@ -266,7 +272,7 @@ class BackgroundProcessLoggingContext(LoggingContext):
super().__exit__(type, value, traceback)
- # The background process has finished. We explictly remove and manually
+ # The background process has finished. We explicitly remove and manually
# update the metrics here so that if nothing is scraping metrics the set
# doesn't infinitely grow.
with _bg_metrics_lock:
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 2e993411..a17352ef 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -28,6 +28,7 @@ from typing import (
Union,
)
+import attr
from prometheus_client import Counter
from twisted.internet import defer
@@ -40,7 +41,6 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import (
Collection,
@@ -174,6 +174,17 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
return bool(self.events)
+@attr.s(slots=True, frozen=True)
+class _PendingRoomEventEntry:
+ event_pos = attr.ib(type=PersistedEventPosition)
+ extra_users = attr.ib(type=Collection[UserID])
+
+ room_id = attr.ib(type=str)
+ type = attr.ib(type=str)
+ state_key = attr.ib(type=Optional[str])
+ membership = attr.ib(type=Optional[str])
+
+
class Notifier:
""" This class is responsible for notifying any listeners when there are
new events available for it.
@@ -191,9 +202,7 @@ class Notifier:
self.storage = hs.get_storage()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
- self.pending_new_room_events = (
- []
- ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
+ self.pending_new_room_events = [] # type: List[_PendingRoomEventEntry]
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -256,7 +265,29 @@ class Notifier:
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
- """ Used by handlers to inform the notifier something has happened
+ """Unwraps event and calls `on_new_room_event_args`.
+ """
+ self.on_new_room_event_args(
+ event_pos=event_pos,
+ room_id=event.room_id,
+ event_type=event.type,
+ state_key=event.get("state_key"),
+ membership=event.content.get("membership"),
+ max_room_stream_token=max_room_stream_token,
+ extra_users=extra_users,
+ )
+
+ def on_new_room_event_args(
+ self,
+ room_id: str,
+ event_type: str,
+ state_key: Optional[str],
+ membership: Optional[str],
+ event_pos: PersistedEventPosition,
+ max_room_stream_token: RoomStreamToken,
+ extra_users: Collection[UserID] = [],
+ ):
+ """Used by handlers to inform the notifier something has happened
in the room, room event wise.
This triggers the notifier to wake up any listeners that are
@@ -267,7 +298,16 @@ class Notifier:
until all previous events have been persisted before notifying
the client streams.
"""
- self.pending_new_room_events.append((event_pos, event, extra_users))
+ self.pending_new_room_events.append(
+ _PendingRoomEventEntry(
+ event_pos=event_pos,
+ extra_users=extra_users,
+ room_id=room_id,
+ type=event_type,
+ state_key=state_key,
+ membership=membership,
+ )
+ )
self._notify_pending_new_room_events(max_room_stream_token)
self.notify_replication()
@@ -285,18 +325,19 @@ class Notifier:
users = set() # type: Set[UserID]
rooms = set() # type: Set[str]
- for event_pos, event, extra_users in pending:
- if event_pos.persisted_after(max_room_stream_token):
- self.pending_new_room_events.append((event_pos, event, extra_users))
+ for entry in pending:
+ if entry.event_pos.persisted_after(max_room_stream_token):
+ self.pending_new_room_events.append(entry)
else:
if (
- event.type == EventTypes.Member
- and event.membership == Membership.JOIN
+ entry.type == EventTypes.Member
+ and entry.membership == Membership.JOIN
+ and entry.state_key
):
- self._user_joined_room(event.state_key, event.room_id)
+ self._user_joined_room(entry.state_key, entry.room_id)
- users.update(extra_users)
- rooms.add(event.room_id)
+ users.update(entry.extra_users)
+ rooms.add(entry.room_id)
if users or rooms:
self.on_new_event(
@@ -310,44 +351,37 @@ class Notifier:
"""
# poke any interested application service.
- run_as_background_process(
- "_notify_app_services", self._notify_app_services, max_room_stream_token
- )
-
- run_as_background_process(
- "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
- )
+ self._notify_app_services(max_room_stream_token)
+ self._notify_pusher_pool(max_room_stream_token)
if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_token)
- async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
+ def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
- await self.appservice_handler.notify_interested_services(
- max_room_stream_token
- )
+ self.appservice_handler.notify_interested_services(max_room_stream_token)
except Exception:
logger.exception("Error notifying application services of event")
- async def _notify_app_services_ephemeral(
+ def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
- users: Collection[UserID] = [],
+ users: Collection[Union[str, UserID]] = [],
):
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
- await self.appservice_handler.notify_interested_services_ephemeral(
+ self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users
)
except Exception:
logger.exception("Error notifying application services of event")
- async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
+ def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
- await self._pusher_pool.on_new_notifications(max_room_stream_token)
+ self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")
@@ -384,16 +418,12 @@ class Notifier:
self.notify_replication()
# Notify appservices
- run_as_background_process(
- "_notify_app_services_ephemeral",
- self._notify_app_services_ephemeral,
- stream_key,
- new_token,
- users,
+ self._notify_app_services_ephemeral(
+ stream_key, new_token, users,
)
def on_new_replication_data(self) -> None:
- """Used to inform replication listeners that something has happend
+ """Used to inform replication listeners that something has happened
without waking up any of the normal user event streams"""
self.notify_replication()
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 8047873f..f5788c1d 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -37,7 +37,7 @@ def list_with_base_rules(rawrules, use_new_defaults=False):
modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0}
# Remove the modified base rules from the list, They'll be added back
- # in the default postions in the list.
+ # in the default positions in the list.
rawrules = [r for r in rawrules if r["priority_class"] >= 0]
# shove the server default rules for each kind onto the end of each
@@ -498,6 +498,30 @@ BASE_APPEND_UNDERRIDE_RULES = [
],
"actions": ["notify", {"set_tweak": "highlight", "value": False}],
},
+ {
+ "rule_id": "global/underride/.im.vector.jitsi",
+ "conditions": [
+ {
+ "kind": "event_match",
+ "key": "type",
+ "pattern": "im.vector.modular.widgets",
+ "_id": "_type_modular_widgets",
+ },
+ {
+ "kind": "event_match",
+ "key": "content.type",
+ "pattern": "jitsi",
+ "_id": "_content_type_jitsi",
+ },
+ {
+ "kind": "event_match",
+ "key": "state_key",
+ "pattern": "*",
+ "_id": "_is_state_event",
+ },
+ ],
+ "actions": ["notify", {"set_tweak": "highlight", "value": False}],
+ },
]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index a701defc..82a72dc3 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -15,8 +15,8 @@
# limitations under the License.
import logging
-from collections import namedtuple
+import attr
from prometheus_client import Counter
from synapse.api.constants import EventTypes, Membership, RelationTypes
@@ -26,7 +26,8 @@ from synapse.events.snapshot import EventContext
from synapse.state import POWER_KEY
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import register_cache
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import lru_cache
+from synapse.util.caches.lrucache import LruCache
from .push_rule_evaluator import PushRuleEvaluatorForEvent
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator:
dict of user_id -> push_rules
"""
room_id = event.room_id
- rules_for_room = await self._get_rules_for_room(room_id)
+ rules_for_room = self._get_rules_for_room(room_id)
rules_by_user = await rules_for_room.get_rules(event, context)
@@ -138,7 +139,7 @@ class BulkPushRuleEvaluator:
return rules_by_user
- @cached()
+ @lru_cache()
def _get_rules_for_room(self, room_id):
"""Get the current RulesForRoom object for the given room id
@@ -275,12 +276,14 @@ class RulesForRoom:
the entire cache for the room.
"""
- def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
+ def __init__(
+ self, hs, room_id, rules_for_room_cache: LruCache, room_push_rule_cache_metrics
+ ):
"""
Args:
hs (HomeServer)
room_id (str)
- rules_for_room_cache(Cache): The cache object that caches these
+ rules_for_room_cache: The cache object that caches these
RoomsForUser objects.
room_push_rule_cache_metrics (CacheMetric)
"""
@@ -390,12 +393,12 @@ class RulesForRoom:
continue
# If a user has left a room we remove their push rule. If they
- # joined then we readd it later in _update_rules_with_member_event_ids
+ # joined then we re-add it later in _update_rules_with_member_event_ids
ret_rules_by_user.pop(user_id, None)
missing_member_event_ids[user_id] = event_id
if missing_member_event_ids:
- # If we have some memebr events we haven't seen, look them up
+ # If we have some member events we haven't seen, look them up
# and fetch push rules for them if appropriate.
logger.debug("Found new member events %r", missing_member_event_ids)
await self._update_rules_with_member_event_ids(
@@ -489,13 +492,21 @@ class RulesForRoom:
self.state_group = state_group
-class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
- # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
- # which namedtuple does for us (i.e. two _CacheContext are the same if
- # their caches and keys match). This is important in particular to
- # dedupe when we add callbacks to lru cache nodes, otherwise the number
- # of callbacks would grow.
+@attr.attrs(slots=True, frozen=True)
+class _Invalidation:
+ # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules,
+ # which means that it it is stored on the bulk_get_push_rules cache entry. In order
+ # to ensure that we don't accumulate lots of redunant callbacks on the cache entry,
+ # we need to ensure that two _Invalidation objects are "equal" if they refer to the
+ # same `cache` and `room_id`.
+ #
+ # attrs provides suitable __hash__ and __eq__ methods, provided we remember to
+ # set `frozen=True`.
+
+ cache = attr.ib(type=LruCache)
+ room_id = attr.ib(type=str)
+
def __call__(self):
- rules = self.cache.get_immediate(self.room_id, None, update_metrics=False)
+ rules = self.cache.get(self.room_id, None, update_metrics=False)
if rules:
rules.invalidate_all()
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 155791b7..38195c8e 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -24,7 +24,7 @@ from typing import Iterable, List, TypeVar
import bleach
import jinja2
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import StoreError
from synapse.config.emailconfig import EmailSubjectConfig
from synapse.logging.context import make_deferred_yieldable
@@ -317,9 +317,14 @@ class Mailer:
async def get_room_vars(
self, room_id, user_id, notifs, notif_events, room_state_ids
):
- my_member_event_id = room_state_ids[("m.room.member", user_id)]
- my_member_event = await self.store.get_event(my_member_event_id)
- is_invite = my_member_event.content["membership"] == "invite"
+ # Check if one of the notifs is an invite event for the user.
+ is_invite = False
+ for n in notifs:
+ ev = notif_events[n["event_id"]]
+ if ev.type == EventTypes.Member and ev.state_key == user_id:
+ if ev.content.get("membership") == Membership.INVITE:
+ is_invite = True
+ break
room_name = await calculate_room_name(self.store, room_state_ids, user_id)
@@ -461,16 +466,26 @@ class Mailer:
self.store, room_state_ids[room_id], user_id, fallback_to_members=False
)
- my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)]
- my_member_event = await self.store.get_event(my_member_event_id)
- if my_member_event.content["membership"] == "invite":
- inviter_member_event_id = room_state_ids[room_id][
- ("m.room.member", my_member_event.sender)
- ]
- inviter_member_event = await self.store.get_event(
- inviter_member_event_id
+ # See if one of the notifs is an invite event for the user
+ invite_event = None
+ for n in notifs_by_room[room_id]:
+ ev = notif_events[n["event_id"]]
+ if ev.type == EventTypes.Member and ev.state_key == user_id:
+ if ev.content.get("membership") == Membership.INVITE:
+ invite_event = ev
+ break
+
+ if invite_event:
+ inviter_member_event_id = room_state_ids[room_id].get(
+ ("m.room.member", invite_event.sender)
)
- inviter_name = name_from_member_event(inviter_member_event)
+ inviter_name = invite_event.sender
+ if inviter_member_event_id:
+ inviter_member_event = await self.store.get_event(
+ inviter_member_event_id, allow_none=True
+ )
+ if inviter_member_event:
+ inviter_name = name_from_member_event(inviter_member_event)
if room_name is None:
return self.email_subjects.invite_from_person % {
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0080c68c..f3259649 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Union
from prometheus_client import Gauge
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
@@ -187,7 +190,7 @@ class PusherPool:
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
- async def on_new_notifications(self, max_token: RoomStreamToken):
+ def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return
@@ -201,6 +204,17 @@ class PusherPool:
# Nothing to do
return
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._on_new_notifications(max_token)
+
+ @wrap_as_background_process("on_new_notifications")
+ async def _on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_id = max_token.stream
+
prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 0ddead8a..aab77fc4 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -72,6 +72,10 @@ REQUIREMENTS = [
# prom-client has a history of breaking backwards compatibility between
# minor versions (https://github.com/prometheus/client_python/issues/317),
# so we also pin the minor version.
+ #
+ # Note that we replicate these constraints in the Synapse Dockerfile while
+ # pre-installing dependencies. If these constraints are updated here, the
+ # same change should be made in the Dockerfile.
"prometheus_client>=0.4.0,<0.9.0",
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index e7cc74a5..f0c37eaf 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -77,8 +77,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
requester = Requester.deserialize(self.store, content["requester"])
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
logger.info("remote_join: %s into room: %s", user_id, room_id)
@@ -142,8 +141,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
requester = Requester.deserialize(self.store, content["requester"])
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
# hopefully we're now on the master, so this won't recurse!
event_id, stream_id = await self.member_handler.remote_reject_invite(
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index fc129dba..8fa104c8 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -115,8 +115,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]]
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
+ request.requester = requester
logger.info(
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e27ee216..2618eb1e 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -141,21 +141,25 @@ class ReplicationDataHandler:
if row.type != EventsStreamEventRow.TypeId:
continue
assert isinstance(row, EventsStreamRow)
+ assert isinstance(row.data, EventsStreamEventRow)
- event = await self.store.get_event(
- row.data.event_id, allow_rejected=True
- )
- if event.rejected_reason:
+ if row.data.rejected:
continue
extra_users = () # type: Tuple[UserID, ...]
- if event.type == EventTypes.Member:
- extra_users = (UserID.from_string(event.state_key),)
+ if row.data.type == EventTypes.Member and row.data.state_key:
+ extra_users = (UserID.from_string(row.data.state_key),)
max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
- self.notifier.on_new_room_event(
- event, event_pos, max_token, extra_users
+ self.notifier.on_new_room_event_args(
+ event_pos=event_pos,
+ max_room_stream_token=max_token,
+ extra_users=extra_users,
+ room_id=row.data.room_id,
+ event_type=row.data.type,
+ state_key=row.data.state_key,
+ membership=row.data.membership,
)
# Notify any waiting deferreds. The list is ordered by position so we
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index de19705c..bc6ba709 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -166,7 +166,9 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
Args:
cmd (Command)
"""
- run_as_background_process("send-cmd", self._async_send_command, cmd)
+ run_as_background_process(
+ "send-cmd", self._async_send_command, cmd, bg_start_span=False
+ )
async def _async_send_command(self, cmd: Command):
"""Encode a replication command and send it over our outbound connection"""
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 666c13fd..1d4ceac0 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -117,6 +117,16 @@ class ReplicationStreamer:
stream.discard_updates_and_advance()
return
+ # We check up front to see if anything has actually changed, as we get
+ # poked because of changes that happened on other instances.
+ if all(
+ stream.last_token == stream.current_token(self._instance_name)
+ for stream in self.streams
+ ):
+ return
+
+ # If there are updates then we need to set this even if we're already
+ # looping, as the loop needs to know that he might need to loop again.
self.pending_updates = True
if self.is_looping:
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 82e9e0d6..86a62b71 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,12 +15,15 @@
# limitations under the License.
import heapq
from collections.abc import Iterable
-from typing import List, Tuple, Type
+from typing import TYPE_CHECKING, List, Optional, Tuple, Type
import attr
from ._base import Stream, StreamUpdateResult, Token
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
"""Handling of the 'events' replication stream
This stream contains rows of various types. Each row therefore contains a 'type'
@@ -81,12 +84,14 @@ class BaseEventsStreamRow:
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"
- event_id = attr.ib() # str
- room_id = attr.ib() # str
- type = attr.ib() # str
- state_key = attr.ib() # str, optional
- redacts = attr.ib() # str, optional
- relates_to = attr.ib() # str, optional
+ event_id = attr.ib(type=str)
+ room_id = attr.ib(type=str)
+ type = attr.ib(type=str)
+ state_key = attr.ib(type=Optional[str])
+ redacts = attr.ib(type=Optional[str])
+ relates_to = attr.ib(type=Optional[str])
+ membership = attr.ib(type=Optional[str])
+ rejected = attr.ib(type=bool)
@attr.s(slots=True, frozen=True)
@@ -113,7 +118,7 @@ class EventsStream(Stream):
NAME = "events"
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 789431ef..2a4f7a17 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -31,7 +31,10 @@ from synapse.rest.admin.devices import (
DeviceRestServlet,
DevicesRestServlet,
)
-from synapse.rest.admin.event_reports import EventReportsRestServlet
+from synapse.rest.admin.event_reports import (
+ EventReportDetailRestServlet,
+ EventReportsRestServlet,
+)
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
@@ -44,12 +47,15 @@ from synapse.rest.admin.rooms import (
ShutdownRoomRestServlet,
)
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
+from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
from synapse.rest.admin.users import (
AccountValidityRenewServlet,
DeactivateAccountRestServlet,
+ PushersRestServlet,
ResetPasswordRestServlet,
SearchUsersRestServlet,
UserAdminServlet,
+ UserMediaRestServlet,
UserMembershipRestServlet,
UserRegisterServlet,
UserRestServletV2,
@@ -215,13 +221,17 @@ def register_servlets(hs, http_server):
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)
UserAdminServlet(hs).register(http_server)
+ UserMediaRestServlet(hs).register(http_server)
UserMembershipRestServlet(hs).register(http_server)
UserRestServletV2(hs).register(http_server)
UsersRestServletV2(hs).register(http_server)
DeviceRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
DeleteDevicesRestServlet(hs).register(http_server)
+ UserMediaStatisticsRestServlet(hs).register(http_server)
+ EventReportDetailRestServlet(hs).register(http_server)
EventReportsRestServlet(hs).register(http_server)
+ PushersRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py
index a1638633..ffd3aa38 100644
--- a/synapse/rest/admin/devices.py
+++ b/synapse/rest/admin/devices.py
@@ -119,7 +119,7 @@ class DevicesRestServlet(RestServlet):
raise NotFoundError("Unknown user")
devices = await self.device_handler.get_devices_by_user(target_user.to_string())
- return 200, {"devices": devices}
+ return 200, {"devices": devices, "total": len(devices)}
class DeleteDevicesRestServlet(RestServlet):
diff --git a/synapse/rest/admin/event_reports.py b/synapse/rest/admin/event_reports.py
index 5b8d0594..fd482f0e 100644
--- a/synapse/rest/admin/event_reports.py
+++ b/synapse/rest/admin/event_reports.py
@@ -15,7 +15,7 @@
import logging
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
@@ -86,3 +86,47 @@ class EventReportsRestServlet(RestServlet):
ret["next_token"] = start + len(event_reports)
return 200, ret
+
+
+class EventReportDetailRestServlet(RestServlet):
+ """
+ Get a specific reported event that is known to the homeserver. Results are returned
+ in a dictionary containing report information.
+ The requester must have administrator access in Synapse.
+
+ GET /_synapse/admin/v1/event_reports/<report_id>
+ returns:
+ 200 OK with details report if success otherwise an error.
+
+ Args:
+ The parameter `report_id` is the ID of the event report in the database.
+ Returns:
+ JSON blob of information about the event report
+ """
+
+ PATTERNS = admin_patterns("/event_reports/(?P<report_id>[^/]*)$")
+
+ def __init__(self, hs):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+
+ async def on_GET(self, request, report_id):
+ await assert_requester_is_admin(self.auth, request)
+
+ message = (
+ "The report_id parameter must be a string representing a positive integer."
+ )
+ try:
+ report_id = int(report_id)
+ except ValueError:
+ raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
+
+ if report_id < 0:
+ raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
+
+ ret = await self.store.get_event_report(report_id)
+ if not ret:
+ raise NotFoundError("Event report not found")
+
+ return 200, ret
diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
index ee75095c..ba50cb87 100644
--- a/synapse/rest/admin/media.py
+++ b/synapse/rest/admin/media.py
@@ -16,9 +16,10 @@
import logging
-from synapse.api.errors import AuthError
-from synapse.http.servlet import RestServlet, parse_integer
+from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
+from synapse.http.servlet import RestServlet, parse_boolean, parse_integer
from synapse.rest.admin._base import (
+ admin_patterns,
assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns,
@@ -150,6 +151,80 @@ class PurgeMediaCacheRestServlet(RestServlet):
return 200, ret
+class DeleteMediaByID(RestServlet):
+ """Delete local media by a given ID. Removes it from this server.
+ """
+
+ PATTERNS = admin_patterns("/media/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)")
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+ self.server_name = hs.hostname
+ self.media_repository = hs.get_media_repository()
+
+ async def on_DELETE(self, request, server_name: str, media_id: str):
+ await assert_requester_is_admin(self.auth, request)
+
+ if self.server_name != server_name:
+ raise SynapseError(400, "Can only delete local media")
+
+ if await self.store.get_local_media(media_id) is None:
+ raise NotFoundError("Unknown media")
+
+ logging.info("Deleting local media by ID: %s", media_id)
+
+ deleted_media, total = await self.media_repository.delete_local_media(media_id)
+ return 200, {"deleted_media": deleted_media, "total": total}
+
+
+class DeleteMediaByDateSize(RestServlet):
+ """Delete local media and local copies of remote media by
+ timestamp and size.
+ """
+
+ PATTERNS = admin_patterns("/media/(?P<server_name>[^/]+)/delete")
+
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+ self.server_name = hs.hostname
+ self.media_repository = hs.get_media_repository()
+
+ async def on_POST(self, request, server_name: str):
+ await assert_requester_is_admin(self.auth, request)
+
+ before_ts = parse_integer(request, "before_ts", required=True)
+ size_gt = parse_integer(request, "size_gt", default=0)
+ keep_profiles = parse_boolean(request, "keep_profiles", default=True)
+
+ if before_ts < 0:
+ raise SynapseError(
+ 400,
+ "Query parameter before_ts must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+ if size_gt < 0:
+ raise SynapseError(
+ 400,
+ "Query parameter size_gt must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ if self.server_name != server_name:
+ raise SynapseError(400, "Can only delete local media")
+
+ logging.info(
+ "Deleting local media by timestamp: %s, size larger than: %s, keep profile media: %s"
+ % (before_ts, size_gt, keep_profiles)
+ )
+
+ deleted_media, total = await self.media_repository.delete_old_local_media(
+ before_ts, size_gt, keep_profiles
+ )
+ return 200, {"deleted_media": deleted_media, "total": total}
+
+
def register_servlets_for_media_repo(hs, http_server):
"""
Media repo specific APIs.
@@ -159,3 +234,5 @@ def register_servlets_for_media_repo(hs, http_server):
QuarantineMediaByID(hs).register(http_server)
QuarantineMediaByUser(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
+ DeleteMediaByID(hs).register(http_server)
+ DeleteMediaByDateSize(hs).register(http_server)
diff --git a/synapse/rest/admin/statistics.py b/synapse/rest/admin/statistics.py
new file mode 100644
index 00000000..f2490e38
--- /dev/null
+++ b/synapse/rest/admin/statistics.py
@@ -0,0 +1,122 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 Dirk Klimpel
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.api.errors import Codes, SynapseError
+from synapse.http.servlet import RestServlet, parse_integer, parse_string
+from synapse.http.site import SynapseRequest
+from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
+from synapse.storage.databases.main.stats import UserSortOrder
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class UserMediaStatisticsRestServlet(RestServlet):
+ """
+ Get statistics about uploaded media by users.
+ """
+
+ PATTERNS = admin_patterns("/statistics/users/media$")
+
+ def __init__(self, hs: "HomeServer"):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+
+ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self.auth, request)
+
+ order_by = parse_string(
+ request, "order_by", default=UserSortOrder.USER_ID.value
+ )
+ if order_by not in (
+ UserSortOrder.MEDIA_LENGTH.value,
+ UserSortOrder.MEDIA_COUNT.value,
+ UserSortOrder.USER_ID.value,
+ UserSortOrder.DISPLAYNAME.value,
+ ):
+ raise SynapseError(
+ 400,
+ "Unknown value for order_by: %s" % (order_by,),
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ start = parse_integer(request, "from", default=0)
+ if start < 0:
+ raise SynapseError(
+ 400,
+ "Query parameter from must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ limit = parse_integer(request, "limit", default=100)
+ if limit < 0:
+ raise SynapseError(
+ 400,
+ "Query parameter limit must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ from_ts = parse_integer(request, "from_ts", default=0)
+ if from_ts < 0:
+ raise SynapseError(
+ 400,
+ "Query parameter from_ts must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ until_ts = parse_integer(request, "until_ts")
+ if until_ts is not None:
+ if until_ts < 0:
+ raise SynapseError(
+ 400,
+ "Query parameter until_ts must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+ if until_ts <= from_ts:
+ raise SynapseError(
+ 400,
+ "Query parameter until_ts must be greater than from_ts.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ search_term = parse_string(request, "search_term")
+ if search_term == "":
+ raise SynapseError(
+ 400,
+ "Query parameter search_term cannot be an empty string.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ direction = parse_string(request, "dir", default="f")
+ if direction not in ("f", "b"):
+ raise SynapseError(
+ 400, "Unknown direction: %s" % (direction,), errcode=Codes.INVALID_PARAM
+ )
+
+ users_media, total = await self.store.get_users_media_usage_paginate(
+ start, limit, from_ts, until_ts, order_by, direction, search_term
+ )
+ ret = {"users": users_media, "total": total}
+ if (start + limit) < total:
+ ret["next_token"] = start + len(users_media)
+
+ return 200, ret
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 8efefbc0..3638e219 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -16,6 +16,7 @@ import hashlib
import hmac
import logging
from http import HTTPStatus
+from typing import Tuple
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError
@@ -27,16 +28,28 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
+from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import (
admin_patterns,
assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns,
)
-from synapse.types import UserID
+from synapse.types import JsonDict, UserID
logger = logging.getLogger(__name__)
+_GET_PUSHERS_ALLOWED_KEYS = {
+ "app_display_name",
+ "app_id",
+ "data",
+ "device_display_name",
+ "kind",
+ "lang",
+ "profile_tag",
+ "pushkey",
+}
+
class UsersRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
@@ -399,6 +412,7 @@ class UserRegisterServlet(RestServlet):
admin = body.get("admin", None)
user_type = body.get("user_type", None)
+ displayname = body.get("displayname", None)
if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
raise SynapseError(400, "Invalid user type")
@@ -435,6 +449,7 @@ class UserRegisterServlet(RestServlet):
password_hash=password_hash,
admin=bool(admin),
user_type=user_type,
+ default_display_name=displayname,
by_admin=True,
)
@@ -702,9 +717,114 @@ class UserMembershipRestServlet(RestServlet):
if not self.is_mine(UserID.from_string(user_id)):
raise SynapseError(400, "Can only lookup local users")
+ user = await self.store.get_user_by_id(user_id)
+ if user is None:
+ raise NotFoundError("Unknown user")
+
room_ids = await self.store.get_rooms_for_user(user_id)
- if not room_ids:
+ ret = {"joined_rooms": list(room_ids), "total": len(room_ids)}
+ return 200, ret
+
+
+class PushersRestServlet(RestServlet):
+ """
+ Gets information about all pushers for a specific `user_id`.
+
+ Example:
+ http://localhost:8008/_synapse/admin/v1/users/
+ @user:server/pushers
+
+ Returns:
+ pushers: Dictionary containing pushers information.
+ total: Number of pushers in dictonary `pushers`.
+ """
+
+ PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/pushers$")
+
+ def __init__(self, hs):
+ self.is_mine = hs.is_mine
+ self.store = hs.get_datastore()
+ self.auth = hs.get_auth()
+
+ async def on_GET(
+ self, request: SynapseRequest, user_id: str
+ ) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self.auth, request)
+
+ if not self.is_mine(UserID.from_string(user_id)):
+ raise SynapseError(400, "Can only lookup local users")
+
+ if not await self.store.get_user_by_id(user_id):
raise NotFoundError("User not found")
- ret = {"joined_rooms": list(room_ids), "total": len(room_ids)}
+ pushers = await self.store.get_pushers_by_user_id(user_id)
+
+ filtered_pushers = [
+ {k: v for k, v in p.items() if k in _GET_PUSHERS_ALLOWED_KEYS}
+ for p in pushers
+ ]
+
+ return 200, {"pushers": filtered_pushers, "total": len(filtered_pushers)}
+
+
+class UserMediaRestServlet(RestServlet):
+ """
+ Gets information about all uploaded local media for a specific `user_id`.
+
+ Example:
+ http://localhost:8008/_synapse/admin/v1/users/
+ @user:server/media
+
+ Args:
+ The parameters `from` and `limit` are required for pagination.
+ By default, a `limit` of 100 is used.
+ Returns:
+ A list of media and an integer representing the total number of
+ media that exist given for this user
+ """
+
+ PATTERNS = admin_patterns("/users/(?P<user_id>[^/]+)/media$")
+
+ def __init__(self, hs):
+ self.is_mine = hs.is_mine
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+
+ async def on_GET(
+ self, request: SynapseRequest, user_id: str
+ ) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self.auth, request)
+
+ if not self.is_mine(UserID.from_string(user_id)):
+ raise SynapseError(400, "Can only lookup local users")
+
+ user = await self.store.get_user_by_id(user_id)
+ if user is None:
+ raise NotFoundError("Unknown user")
+
+ start = parse_integer(request, "from", default=0)
+ limit = parse_integer(request, "limit", default=100)
+
+ if start < 0:
+ raise SynapseError(
+ 400,
+ "Query parameter from must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ if limit < 0:
+ raise SynapseError(
+ 400,
+ "Query parameter limit must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ media, total = await self.store.get_local_media_by_user_paginate(
+ start, limit, user_id
+ )
+
+ ret = {"media": media, "total": total}
+ if (start + limit) < total:
+ ret["next_token"] = start + len(media)
+
return 200, ret
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index 1ecb77aa..6de40782 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -67,9 +67,6 @@ class EventStreamRestServlet(RestServlet):
return 200, chunk
- def on_OPTIONS(self, request):
- return 200, {}
-
class EventRestServlet(RestServlet):
PATTERNS = client_patterns("/events/(?P<event_id>[^/]*)$", v1=True)
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index b82a4e97..94452fcb 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -114,9 +114,6 @@ class LoginRestServlet(RestServlet):
return 200, {"flows": flows}
- def on_OPTIONS(self, request: SynapseRequest):
- return 200, {}
-
async def on_POST(self, request: SynapseRequest):
self._address_ratelimiter.ratelimit(request.getClientIP())
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index f792b50c..ad8cea49 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -30,9 +30,6 @@ class LogoutRestServlet(RestServlet):
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
- def on_OPTIONS(self, request):
- return 200, {}
-
async def on_POST(self, request):
requester = await self.auth.get_user_by_req(request, allow_expired=True)
@@ -58,9 +55,6 @@ class LogoutAllRestServlet(RestServlet):
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
- def on_OPTIONS(self, request):
- return 200, {}
-
async def on_POST(self, request):
requester = await self.auth.get_user_by_req(request, allow_expired=True)
user_id = requester.user.to_string()
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index 79d8e305..23a529f8 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -86,9 +86,6 @@ class PresenceStatusRestServlet(RestServlet):
return 200, {}
- def on_OPTIONS(self, request):
- return 200, {}
-
def register_servlets(hs, http_server):
PresenceStatusRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index e7fcd2b1..85a66458 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -67,9 +67,6 @@ class ProfileDisplaynameRestServlet(RestServlet):
return 200, {}
- def on_OPTIONS(self, request, user_id):
- return 200, {}
-
class ProfileAvatarURLRestServlet(RestServlet):
PATTERNS = client_patterns("/profile/(?P<user_id>[^/]*)/avatar_url", v1=True)
@@ -118,9 +115,6 @@ class ProfileAvatarURLRestServlet(RestServlet):
return 200, {}
- def on_OPTIONS(self, request, user_id):
- return 200, {}
-
class ProfileRestServlet(RestServlet):
PATTERNS = client_patterns("/profile/(?P<user_id>[^/]*)", v1=True)
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index f9eecb7c..241e5359 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -155,9 +155,6 @@ class PushRuleRestServlet(RestServlet):
else:
raise UnrecognizedRequestError()
- def on_OPTIONS(self, request, path):
- return 200, {}
-
def notify_user(self, user_id):
stream_id = self.store.get_max_push_rules_stream_id()
self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id])
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 28dabf1c..8fe83f32 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -60,9 +60,6 @@ class PushersRestServlet(RestServlet):
return 200, {"pushers": filtered_pushers}
- def on_OPTIONS(self, _):
- return 200, {}
-
class PushersSetRestServlet(RestServlet):
PATTERNS = client_patterns("/pushers/set$", v1=True)
@@ -140,9 +137,6 @@ class PushersSetRestServlet(RestServlet):
return 200, {}
- def on_OPTIONS(self, _):
- return 200, {}
-
class PushersRemoveRestServlet(RestServlet):
"""
@@ -182,9 +176,6 @@ class PushersRemoveRestServlet(RestServlet):
)
return None
- def on_OPTIONS(self, _):
- return 200, {}
-
def register_servlets(hs, http_server):
PushersRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 00b43970..25d3cc61 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -72,20 +72,6 @@ class RoomCreateRestServlet(TransactionRestServlet):
def register(self, http_server):
PATTERNS = "/createRoom"
register_txn_path(self, PATTERNS, http_server)
- # define CORS for all of /rooms in RoomCreateRestServlet for simplicity
- http_server.register_paths(
- "OPTIONS",
- client_patterns("/rooms(?:/.*)?$", v1=True),
- self.on_OPTIONS,
- self.__class__.__name__,
- )
- # define CORS for /createRoom[/txnid]
- http_server.register_paths(
- "OPTIONS",
- client_patterns("/createRoom(?:/.*)?$", v1=True),
- self.on_OPTIONS,
- self.__class__.__name__,
- )
def on_PUT(self, request, txn_id):
set_tag("txn_id", txn_id)
@@ -104,9 +90,6 @@ class RoomCreateRestServlet(TransactionRestServlet):
user_supplied_config = parse_json_object_from_request(request)
return user_supplied_config
- def on_OPTIONS(self, request):
- return 200, {}
-
# TODO: Needs unit testing for generic events
class RoomStateEventRestServlet(TransactionRestServlet):
diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py
index b8d491ca..d07ca2c4 100644
--- a/synapse/rest/client/v1/voip.py
+++ b/synapse/rest/client/v1/voip.py
@@ -69,9 +69,6 @@ class VoipRestServlet(RestServlet):
},
)
- def on_OPTIONS(self, request):
- return 200, {}
-
def register_servlets(hs, http_server):
VoipRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index e857cff1..a54e1011 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -38,6 +38,7 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
+from synapse.metrics import threepid_send_requests
from synapse.push.mailer import Mailer
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.stringutils import assert_valid_client_secret, random_string
@@ -143,6 +144,10 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
# Wrap the session id in a JSON object
ret = {"sid": sid}
+ threepid_send_requests.labels(type="email", reason="password_reset").observe(
+ send_attempt
+ )
+
return 200, ret
@@ -268,9 +273,6 @@ class PasswordRestServlet(RestServlet):
return 200, {}
- def on_OPTIONS(self, _):
- return 200, {}
-
class DeactivateAccountRestServlet(RestServlet):
PATTERNS = client_patterns("/account/deactivate$")
@@ -414,6 +416,10 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
# Wrap the session id in a JSON object
ret = {"sid": sid}
+ threepid_send_requests.labels(type="email", reason="add_threepid").observe(
+ send_attempt
+ )
+
return 200, ret
@@ -484,6 +490,10 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
next_link,
)
+ threepid_send_requests.labels(type="msisdn", reason="add_threepid").observe(
+ send_attempt
+ )
+
return 200, ret
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 5fbfae59..fab07774 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -176,9 +176,6 @@ class AuthRestServlet(RestServlet):
respond_with_html(request, 200, html)
return None
- def on_OPTIONS(self, _):
- return 200, {}
-
def register_servlets(hs, http_server):
AuthRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 395b6a82..ea681140 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -45,6 +45,7 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
+from synapse.metrics import threepid_send_requests
from synapse.push.mailer import Mailer
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.ratelimitutils import FederationRateLimiter
@@ -163,6 +164,10 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
# Wrap the session id in a JSON object
ret = {"sid": sid}
+ threepid_send_requests.labels(type="email", reason="register").observe(
+ send_attempt
+ )
+
return 200, ret
@@ -234,6 +239,10 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
next_link,
)
+ threepid_send_requests.labels(type="msisdn", reason="register").observe(
+ send_attempt
+ )
+
return 200, ret
@@ -642,9 +651,6 @@ class RegisterRestServlet(RestServlet):
return 200, return_dict
- def on_OPTIONS(self, _):
- return 200, {}
-
async def _do_appservice_registration(self, username, as_token, body):
user_id = await self.registration_handler.appservice_register(
username, as_token
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index 7447eeae..9e079f67 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -69,6 +69,23 @@ class MediaFilePaths:
local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
+ def local_media_thumbnail_dir(self, media_id: str) -> str:
+ """
+ Retrieve the local store path of thumbnails of a given media_id
+
+ Args:
+ media_id: The media ID to query.
+ Returns:
+ Path of local_thumbnails from media_id
+ """
+ return os.path.join(
+ self.base_path,
+ "local_thumbnails",
+ media_id[0:2],
+ media_id[2:4],
+ media_id[4:],
+ )
+
def remote_media_filepath_rel(self, server_name, file_id):
return os.path.join(
"remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:]
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index e1192b47..9cac74eb 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -18,7 +18,7 @@ import errno
import logging
import os
import shutil
-from typing import IO, Dict, Optional, Tuple
+from typing import IO, Dict, List, Optional, Tuple
import twisted.internet.error
import twisted.web.http
@@ -305,15 +305,12 @@ class MediaRepository:
# file_id is the ID we use to track the file locally. If we've already
# seen the file then reuse the existing ID, otherwise genereate a new
# one.
- if media_info:
- file_id = media_info["filesystem_id"]
- else:
- file_id = random_string(24)
-
- file_info = FileInfo(server_name, file_id)
# If we have an entry in the DB, try and look for it
if media_info:
+ file_id = media_info["filesystem_id"]
+ file_info = FileInfo(server_name, file_id)
+
if media_info["quarantined_by"]:
logger.info("Media is quarantined")
raise NotFoundError()
@@ -324,14 +321,34 @@ class MediaRepository:
# Failed to find the file anywhere, lets download it.
- media_info = await self._download_remote_file(server_name, media_id, file_id)
+ try:
+ media_info = await self._download_remote_file(server_name, media_id,)
+ except SynapseError:
+ raise
+ except Exception as e:
+ # An exception may be because we downloaded media in another
+ # process, so let's check if we magically have the media.
+ media_info = await self.store.get_cached_remote_media(server_name, media_id)
+ if not media_info:
+ raise e
+
+ file_id = media_info["filesystem_id"]
+ file_info = FileInfo(server_name, file_id)
+
+ # We generate thumbnails even if another process downloaded the media
+ # as a) it's conceivable that the other download request dies before it
+ # generates thumbnails, but mainly b) we want to be sure the thumbnails
+ # have finished being generated before responding to the client,
+ # otherwise they'll request thumbnails and get a 404 if they're not
+ # ready yet.
+ await self._generate_thumbnails(
+ server_name, media_id, file_id, media_info["media_type"]
+ )
responder = await self.media_storage.fetch_media(file_info)
return responder, media_info
- async def _download_remote_file(
- self, server_name: str, media_id: str, file_id: str
- ) -> dict:
+ async def _download_remote_file(self, server_name: str, media_id: str,) -> dict:
"""Attempt to download the remote file from the given server name,
using the given file_id as the local id.
@@ -346,6 +363,8 @@ class MediaRepository:
The media info of the file.
"""
+ file_id = random_string(24)
+
file_info = FileInfo(server_name=server_name, file_id=file_id)
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
@@ -401,22 +420,32 @@ class MediaRepository:
await finish()
- media_type = headers[b"Content-Type"][0].decode("ascii")
- upload_name = get_filename_from_headers(headers)
- time_now_ms = self.clock.time_msec()
+ media_type = headers[b"Content-Type"][0].decode("ascii")
+ upload_name = get_filename_from_headers(headers)
+ time_now_ms = self.clock.time_msec()
+
+ # Multiple remote media download requests can race (when using
+ # multiple media repos), so this may throw a violation constraint
+ # exception. If it does we'll delete the newly downloaded file from
+ # disk (as we're in the ctx manager).
+ #
+ # However: we've already called `finish()` so we may have also
+ # written to the storage providers. This is preferable to the
+ # alternative where we call `finish()` *after* this, where we could
+ # end up having an entry in the DB but fail to write the files to
+ # the storage providers.
+ await self.store.store_cached_remote_media(
+ origin=server_name,
+ media_id=media_id,
+ media_type=media_type,
+ time_now_ms=self.clock.time_msec(),
+ upload_name=upload_name,
+ media_length=length,
+ filesystem_id=file_id,
+ )
logger.info("Stored remote media in file %r", fname)
- await self.store.store_cached_remote_media(
- origin=server_name,
- media_id=media_id,
- media_type=media_type,
- time_now_ms=self.clock.time_msec(),
- upload_name=upload_name,
- media_length=length,
- filesystem_id=file_id,
- )
-
media_info = {
"media_type": media_type,
"media_length": length,
@@ -425,8 +454,6 @@ class MediaRepository:
"filesystem_id": file_id,
}
- await self._generate_thumbnails(server_name, media_id, file_id, media_type)
-
return media_info
def _get_thumbnail_requirements(self, media_type):
@@ -692,42 +719,60 @@ class MediaRepository:
if not t_byte_source:
continue
- try:
- file_info = FileInfo(
- server_name=server_name,
- file_id=file_id,
- thumbnail=True,
- thumbnail_width=t_width,
- thumbnail_height=t_height,
- thumbnail_method=t_method,
- thumbnail_type=t_type,
- url_cache=url_cache,
- )
-
- output_path = await self.media_storage.store_file(
- t_byte_source, file_info
- )
- finally:
- t_byte_source.close()
-
- t_len = os.path.getsize(output_path)
+ file_info = FileInfo(
+ server_name=server_name,
+ file_id=file_id,
+ thumbnail=True,
+ thumbnail_width=t_width,
+ thumbnail_height=t_height,
+ thumbnail_method=t_method,
+ thumbnail_type=t_type,
+ url_cache=url_cache,
+ )
- # Write to database
- if server_name:
- await self.store.store_remote_media_thumbnail(
- server_name,
- media_id,
- file_id,
- t_width,
- t_height,
- t_type,
- t_method,
- t_len,
- )
- else:
- await self.store.store_local_thumbnail(
- media_id, t_width, t_height, t_type, t_method, t_len
- )
+ with self.media_storage.store_into_file(file_info) as (f, fname, finish):
+ try:
+ await self.media_storage.write_to_file(t_byte_source, f)
+ await finish()
+ finally:
+ t_byte_source.close()
+
+ t_len = os.path.getsize(fname)
+
+ # Write to database
+ if server_name:
+ # Multiple remote media download requests can race (when
+ # using multiple media repos), so this may throw a violation
+ # constraint exception. If it does we'll delete the newly
+ # generated thumbnail from disk (as we're in the ctx
+ # manager).
+ #
+ # However: we've already called `finish()` so we may have
+ # also written to the storage providers. This is preferable
+ # to the alternative where we call `finish()` *after* this,
+ # where we could end up having an entry in the DB but fail
+ # to write the files to the storage providers.
+ try:
+ await self.store.store_remote_media_thumbnail(
+ server_name,
+ media_id,
+ file_id,
+ t_width,
+ t_height,
+ t_type,
+ t_method,
+ t_len,
+ )
+ except Exception as e:
+ thumbnail_exists = await self.store.get_remote_media_thumbnail(
+ server_name, media_id, t_width, t_height, t_type,
+ )
+ if not thumbnail_exists:
+ raise e
+ else:
+ await self.store.store_local_thumbnail(
+ media_id, t_width, t_height, t_type, t_method, t_len
+ )
return {"width": m_width, "height": m_height}
@@ -767,6 +812,76 @@ class MediaRepository:
return {"deleted": deleted}
+ async def delete_local_media(self, media_id: str) -> Tuple[List[str], int]:
+ """
+ Delete the given local or remote media ID from this server
+
+ Args:
+ media_id: The media ID to delete.
+ Returns:
+ A tuple of (list of deleted media IDs, total deleted media IDs).
+ """
+ return await self._remove_local_media_from_disk([media_id])
+
+ async def delete_old_local_media(
+ self, before_ts: int, size_gt: int = 0, keep_profiles: bool = True,
+ ) -> Tuple[List[str], int]:
+ """
+ Delete local or remote media from this server by size and timestamp. Removes
+ media files, any thumbnails and cached URLs.
+
+ Args:
+ before_ts: Unix timestamp in ms.
+ Files that were last used before this timestamp will be deleted
+ size_gt: Size of the media in bytes. Files that are larger will be deleted
+ keep_profiles: Switch to delete also files that are still used in image data
+ (e.g user profile, room avatar)
+ If false these files will be deleted
+ Returns:
+ A tuple of (list of deleted media IDs, total deleted media IDs).
+ """
+ old_media = await self.store.get_local_media_before(
+ before_ts, size_gt, keep_profiles,
+ )
+ return await self._remove_local_media_from_disk(old_media)
+
+ async def _remove_local_media_from_disk(
+ self, media_ids: List[str]
+ ) -> Tuple[List[str], int]:
+ """
+ Delete local or remote media from this server. Removes media files,
+ any thumbnails and cached URLs.
+
+ Args:
+ media_ids: List of media_id to delete
+ Returns:
+ A tuple of (list of deleted media IDs, total deleted media IDs).
+ """
+ removed_media = []
+ for media_id in media_ids:
+ logger.info("Deleting media with ID '%s'", media_id)
+ full_path = self.filepaths.local_media_filepath(media_id)
+ try:
+ os.remove(full_path)
+ except OSError as e:
+ logger.warning("Failed to remove file: %r: %s", full_path, e)
+ if e.errno == errno.ENOENT:
+ pass
+ else:
+ continue
+
+ thumbnail_dir = self.filepaths.local_media_thumbnail_dir(media_id)
+ shutil.rmtree(thumbnail_dir, ignore_errors=True)
+
+ await self.store.delete_remote_media(self.server_name, media_id)
+
+ await self.store.delete_url_cache((media_id,))
+ await self.store.delete_url_cache_media((media_id,))
+
+ removed_media.append(media_id)
+
+ return removed_media, len(removed_media)
+
class MediaRepositoryResource(Resource):
"""File uploading and downloading.
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index a9586fb0..268e0c8f 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -52,6 +52,7 @@ class MediaStorage:
storage_providers: Sequence["StorageProviderWrapper"],
):
self.hs = hs
+ self.reactor = hs.get_reactor()
self.local_media_directory = local_media_directory
self.filepaths = filepaths
self.storage_providers = storage_providers
@@ -70,13 +71,16 @@ class MediaStorage:
with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
- await defer_to_thread(
- self.hs.get_reactor(), _write_file_synchronously, source, f
- )
+ await self.write_to_file(source, f)
await finish_cb()
return fname
+ async def write_to_file(self, source: IO, output: IO):
+ """Asynchronously write the `source` to `output`.
+ """
+ await defer_to_thread(self.reactor, _write_file_synchronously, source, output)
+
@contextlib.contextmanager
def store_into_file(self, file_info: FileInfo):
"""Context manager used to get a file like object to write into, as
@@ -112,14 +116,20 @@ class MediaStorage:
finished_called = [False]
- async def finish():
- for provider in self.storage_providers:
- await provider.store_file(path, file_info)
-
- finished_called[0] = True
-
try:
with open(fname, "wb") as f:
+
+ async def finish():
+ # Ensure that all writes have been flushed and close the
+ # file.
+ f.flush()
+ f.close()
+
+ for provider in self.storage_providers:
+ await provider.store_file(path, file_info)
+
+ finished_called[0] = True
+
yield f, fname, finish
except Exception:
try:
@@ -210,7 +220,7 @@ class MediaStorage:
if res:
with res:
consumer = BackgroundFileConsumer(
- open(local_path, "wb"), self.hs.get_reactor()
+ open(local_path, "wb"), self.reactor
)
await res.write_to_consumer(consumer)
await consumer.wait()
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index 3673e7f4..9137c4ed 100644
--- a/synapse/server_notices/consent_server_notices.py
+++ b/synapse/server_notices/consent_server_notices.py
@@ -104,7 +104,7 @@ class ConsentServerNotices:
def copy_with_str_subst(x: Any, substitutions: Any) -> Any:
- """Deep-copy a structure, carrying out string substitions on any strings
+ """Deep-copy a structure, carrying out string substitutions on any strings
Args:
x (object): structure to be copied
diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py
index 0422d4c7..d464c75c 100644
--- a/synapse/server_notices/server_notices_manager.py
+++ b/synapse/server_notices/server_notices_manager.py
@@ -119,7 +119,7 @@ class ServerNoticesManager:
# manages to invite the system user to a room, that doesn't make it
# the server notices room.
user_ids = await self._store.get_users_in_room(room.room_id)
- if self.server_notices_mxid in user_ids:
+ if len(user_ids) <= 2 and self.server_notices_mxid in user_ids:
# we found a room which our user shares with the system notice
# user
logger.info(
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 5b0900aa..1fa3b280 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -547,7 +547,7 @@ class StateResolutionHandler:
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
- used as a starting point fof finding the state we need; any missing
+ used as a starting point for finding the state we need; any missing
events will be requested via state_res_store.
If None, all events will be fetched via state_res_store.
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index a493279c..85edae05 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -56,7 +56,7 @@ async def resolve_events_with_store(
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
- used as a starting point fof finding the state we need; any missing
+ used as a starting point for finding the state we need; any missing
events will be requested via state_map_factory.
If None, all events will be fetched via state_map_factory.
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index edf94e7a..f57df0d7 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -69,7 +69,7 @@ async def resolve_events_with_store(
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
- used as a starting point fof finding the state we need; any missing
+ used as a starting point for finding the state we need; any missing
events will be requested via state_res_store.
If None, all events will be fetched via state_res_store.
diff --git a/synapse/static/client/login/js/login.js b/synapse/static/client/login/js/login.js
index 3678670e..744800ec 100644
--- a/synapse/static/client/login/js/login.js
+++ b/synapse/static/client/login/js/login.js
@@ -182,7 +182,7 @@ matrixLogin.passwordLogin = function() {
};
/*
- * The onLogin function gets called after a succesful login.
+ * The onLogin function gets called after a successful login.
*
* It is expected that implementations override this to be notified when the
* login is complete. The response to the login call is provided as the single
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 0217e631..d1b5760c 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -88,13 +88,18 @@ def make_pool(
"""Get the connection pool for the database.
"""
+ # By default enable `cp_reconnect`. We need to fiddle with db_args in case
+ # someone has explicitly set `cp_reconnect`.
+ db_args = dict(db_config.config.get("args", {}))
+ db_args.setdefault("cp_reconnect", True)
+
return adbapi.ConnectionPool(
db_config.config["name"],
cp_reactor=reactor,
cp_openfun=lambda conn: engine.on_new_connection(
LoggingDatabaseConnection(conn, engine, "on_new_connection")
),
- **db_config.config.get("args", {})
+ **db_args,
)
@@ -632,7 +637,7 @@ class DatabasePool:
func,
*args,
db_autocommit=db_autocommit,
- **kwargs
+ **kwargs,
)
for after_callback, after_args, after_kwargs in after_callbacks:
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 9b16f45f..43660ec4 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -146,7 +146,6 @@ class DataStore(
db_conn, "e2e_cross_signing_keys", "stream_id"
)
- self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 637a938b..e550cbc8 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -15,21 +15,31 @@
# limitations under the License.
import logging
import re
-from typing import List
+from typing import TYPE_CHECKING, List, Optional, Pattern, Tuple
-from synapse.appservice import ApplicationService, AppServiceTransaction
+from synapse.appservice import (
+ ApplicationService,
+ ApplicationServiceState,
+ AppServiceTransaction,
+)
from synapse.config.appservice import load_appservices
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.storage.types import Connection
from synapse.types import JsonDict
from synapse.util import json_encoder
+if TYPE_CHECKING:
+ from synapse.app.homeserver import HomeServer
+
logger = logging.getLogger(__name__)
-def _make_exclusive_regex(services_cache):
+def _make_exclusive_regex(
+ services_cache: List[ApplicationService],
+) -> Optional[Pattern]:
# We precompile a regex constructed from all the regexes that the AS's
# have registered for exclusive users.
exclusive_user_regexes = [
@@ -39,17 +49,19 @@ def _make_exclusive_regex(services_cache):
]
if exclusive_user_regexes:
exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
- exclusive_user_regex = re.compile(exclusive_user_regex)
+ exclusive_user_pattern = re.compile(
+ exclusive_user_regex
+ ) # type: Optional[Pattern]
else:
# We handle this case specially otherwise the constructed regex
# will always match
- exclusive_user_regex = None
+ exclusive_user_pattern = None
- return exclusive_user_regex
+ return exclusive_user_pattern
class ApplicationServiceWorkerStore(SQLBaseStore):
- def __init__(self, database: DatabasePool, db_conn, hs):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
self.services_cache = load_appservices(
hs.hostname, hs.config.app_service_config_files
)
@@ -60,7 +72,7 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
def get_app_services(self):
return self.services_cache
- def get_if_app_services_interested_in_user(self, user_id):
+ def get_if_app_services_interested_in_user(self, user_id: str) -> bool:
"""Check if the user is one associated with an app service (exclusively)
"""
if self.exclusive_user_regex:
@@ -68,7 +80,7 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
else:
return False
- def get_app_service_by_user_id(self, user_id):
+ def get_app_service_by_user_id(self, user_id: str) -> Optional[ApplicationService]:
"""Retrieve an application service from their user ID.
All application services have associated with them a particular user ID.
@@ -77,35 +89,35 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
a user ID to an application service.
Args:
- user_id(str): The user ID to see if it is an application service.
+ user_id: The user ID to see if it is an application service.
Returns:
- synapse.appservice.ApplicationService or None.
+ The application service or None.
"""
for service in self.services_cache:
if service.sender == user_id:
return service
return None
- def get_app_service_by_token(self, token):
+ def get_app_service_by_token(self, token: str) -> Optional[ApplicationService]:
"""Get the application service with the given appservice token.
Args:
- token (str): The application service token.
+ token: The application service token.
Returns:
- synapse.appservice.ApplicationService or None.
+ The application service or None.
"""
for service in self.services_cache:
if service.token == token:
return service
return None
- def get_app_service_by_id(self, as_id):
+ def get_app_service_by_id(self, as_id: str) -> Optional[ApplicationService]:
"""Get the application service with the given appservice ID.
Args:
- as_id (str): The application service ID.
+ as_id: The application service ID.
Returns:
- synapse.appservice.ApplicationService or None.
+ The application service or None.
"""
for service in self.services_cache:
if service.id == as_id:
@@ -124,11 +136,13 @@ class ApplicationServiceStore(ApplicationServiceWorkerStore):
class ApplicationServiceTransactionWorkerStore(
ApplicationServiceWorkerStore, EventsWorkerStore
):
- async def get_appservices_by_state(self, state):
+ async def get_appservices_by_state(
+ self, state: ApplicationServiceState
+ ) -> List[ApplicationService]:
"""Get a list of application services based on their state.
Args:
- state(ApplicationServiceState): The state to filter on.
+ state: The state to filter on.
Returns:
A list of ApplicationServices, which may be empty.
"""
@@ -145,13 +159,15 @@ class ApplicationServiceTransactionWorkerStore(
services.append(service)
return services
- async def get_appservice_state(self, service):
+ async def get_appservice_state(
+ self, service: ApplicationService
+ ) -> Optional[ApplicationServiceState]:
"""Get the application service state.
Args:
- service(ApplicationService): The service whose state to set.
+ service: The service whose state to set.
Returns:
- An ApplicationServiceState.
+ An ApplicationServiceState or none.
"""
result = await self.db_pool.simple_select_one(
"application_services_state",
@@ -164,12 +180,14 @@ class ApplicationServiceTransactionWorkerStore(
return result.get("state")
return None
- async def set_appservice_state(self, service, state) -> None:
+ async def set_appservice_state(
+ self, service: ApplicationService, state: ApplicationServiceState
+ ) -> None:
"""Set the application service state.
Args:
- service(ApplicationService): The service whose state to set.
- state(ApplicationServiceState): The connectivity state to apply.
+ service: The service whose state to set.
+ state: The connectivity state to apply.
"""
await self.db_pool.simple_upsert(
"application_services_state", {"as_id": service.id}, {"state": state}
@@ -226,13 +244,14 @@ class ApplicationServiceTransactionWorkerStore(
"create_appservice_txn", _create_appservice_txn
)
- async def complete_appservice_txn(self, txn_id, service) -> None:
+ async def complete_appservice_txn(
+ self, txn_id: int, service: ApplicationService
+ ) -> None:
"""Completes an application service transaction.
Args:
- txn_id(str): The transaction ID being completed.
- service(ApplicationService): The application service which was sent
- this transaction.
+ txn_id: The transaction ID being completed.
+ service: The application service which was sent this transaction.
"""
txn_id = int(txn_id)
@@ -272,12 +291,13 @@ class ApplicationServiceTransactionWorkerStore(
"complete_appservice_txn", _complete_appservice_txn
)
- async def get_oldest_unsent_txn(self, service):
- """Get the oldest transaction which has not been sent for this
- service.
+ async def get_oldest_unsent_txn(
+ self, service: ApplicationService
+ ) -> Optional[AppServiceTransaction]:
+ """Get the oldest transaction which has not been sent for this service.
Args:
- service(ApplicationService): The app service to get the oldest txn.
+ service: The app service to get the oldest txn.
Returns:
An AppServiceTransaction or None.
"""
@@ -313,7 +333,7 @@ class ApplicationServiceTransactionWorkerStore(
service=service, id=entry["txn_id"], events=events, ephemeral=[]
)
- def _get_last_txn(self, txn, service_id):
+ def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
txn.execute(
"SELECT last_txn FROM application_services_state WHERE as_id=?",
(service_id,),
@@ -324,7 +344,7 @@ class ApplicationServiceTransactionWorkerStore(
else:
return int(last_txn_id[0]) # select 'last_txn' col
- async def set_appservice_last_pos(self, pos) -> None:
+ async def set_appservice_last_pos(self, pos: int) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
@@ -334,7 +354,9 @@ class ApplicationServiceTransactionWorkerStore(
"set_appservice_last_pos", set_appservice_last_pos_txn
)
- async def get_new_events_for_appservice(self, current_id, limit):
+ async def get_new_events_for_appservice(
+ self, current_id: int, limit: int
+ ) -> Tuple[int, List[EventBase]]:
"""Get all new events for an appservice"""
def get_new_events_for_appservice_txn(txn):
@@ -394,7 +416,7 @@ class ApplicationServiceTransactionWorkerStore(
)
async def set_type_stream_id_for_appservice(
- self, service: ApplicationService, type: str, pos: int
+ self, service: ApplicationService, type: str, pos: Optional[int]
) -> None:
if type not in ("read_receipt", "presence"):
raise ValueError(
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 44159094..4d1b92d1 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -24,7 +24,7 @@ from twisted.enterprise.adbapi import Connection
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, db_to_json
-from synapse.storage.database import make_in_list_sql_clause
+from synapse.storage.database import DatabasePool, make_in_list_sql_clause
from synapse.storage.types import Cursor
from synapse.types import JsonDict
from synapse.util import json_encoder
@@ -33,6 +33,7 @@ from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
from synapse.handlers.e2e_keys import SignatureListItem
+ from synapse.server import HomeServer
@attr.s(slots=True)
@@ -47,7 +48,20 @@ class DeviceKeyLookupResult:
keys = attr.ib(type=Optional[JsonDict])
-class EndToEndKeyWorkerStore(SQLBaseStore):
+class EndToEndKeyBackgroundStore(SQLBaseStore):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
+ super().__init__(database, db_conn, hs)
+
+ self.db_pool.updates.register_background_index_update(
+ "e2e_cross_signing_keys_idx",
+ index_name="e2e_cross_signing_keys_stream_idx",
+ table="e2e_cross_signing_keys",
+ columns=["stream_id"],
+ unique=True,
+ )
+
+
+class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
async def get_e2e_device_keys_for_federation_query(
self, user_id: str
) -> Tuple[int, List[JsonDict]]:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index a6279a6c..2e07c373 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -26,6 +26,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.types import Collection
from synapse.util.caches.descriptors import cached
+from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
logger = logging.getLogger(__name__)
@@ -40,6 +41,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)
+ # Cache of event ID to list of auth event IDs and their depths.
+ self._event_auth_cache = LruCache(
+ 500000, "_event_auth_cache", size_callback=len
+ ) # type: LruCache[str, List[Tuple[str, int]]]
+
async def get_auth_chain(
self, event_ids: Collection[str], include_given: bool = False
) -> List[EventBase]:
@@ -84,17 +90,45 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
else:
results = set()
- base_sql = "SELECT DISTINCT auth_id FROM event_auth WHERE "
+ # We pull out the depth simply so that we can populate the
+ # `_event_auth_cache` cache.
+ base_sql = """
+ SELECT a.event_id, auth_id, depth
+ FROM event_auth AS a
+ INNER JOIN events AS e ON (e.event_id = a.auth_id)
+ WHERE
+ """
front = set(event_ids)
while front:
new_front = set()
for chunk in batch_iter(front, 100):
- clause, args = make_in_list_sql_clause(
- txn.database_engine, "event_id", chunk
- )
- txn.execute(base_sql + clause, args)
- new_front.update(r[0] for r in txn)
+ # Pull the auth events either from the cache or DB.
+ to_fetch = [] # Event IDs to fetch from DB # type: List[str]
+ for event_id in chunk:
+ res = self._event_auth_cache.get(event_id)
+ if res is None:
+ to_fetch.append(event_id)
+ else:
+ new_front.update(auth_id for auth_id, depth in res)
+
+ if to_fetch:
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "a.event_id", to_fetch
+ )
+ txn.execute(base_sql + clause, args)
+
+ # Note we need to batch up the results by event ID before
+ # adding to the cache.
+ to_cache = {}
+ for event_id, auth_event_id, auth_event_depth in txn:
+ to_cache.setdefault(event_id, []).append(
+ (auth_event_id, auth_event_depth)
+ )
+ new_front.add(auth_event_id)
+
+ for event_id, auth_events in to_cache.items():
+ self._event_auth_cache.set(event_id, auth_events)
new_front -= results
@@ -213,14 +247,38 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
break
# Fetch the auth events and their depths of the N last events we're
- # currently walking
+ # currently walking, either from cache or DB.
search, chunk = search[:-100], search[-100:]
- clause, args = make_in_list_sql_clause(
- txn.database_engine, "a.event_id", [e_id for _, e_id in chunk]
- )
- txn.execute(base_sql + clause, args)
- for event_id, auth_event_id, auth_event_depth in txn:
+ found = [] # Results found # type: List[Tuple[str, str, int]]
+ to_fetch = [] # Event IDs to fetch from DB # type: List[str]
+ for _, event_id in chunk:
+ res = self._event_auth_cache.get(event_id)
+ if res is None:
+ to_fetch.append(event_id)
+ else:
+ found.extend((event_id, auth_id, depth) for auth_id, depth in res)
+
+ if to_fetch:
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "a.event_id", to_fetch
+ )
+ txn.execute(base_sql + clause, args)
+
+ # We parse the results and add the to the `found` set and the
+ # cache (note we need to batch up the results by event ID before
+ # adding to the cache).
+ to_cache = {}
+ for event_id, auth_event_id, auth_event_depth in txn:
+ to_cache.setdefault(event_id, []).append(
+ (auth_event_id, auth_event_depth)
+ )
+ found.append((event_id, auth_event_id, auth_event_depth))
+
+ for event_id, auth_events in to_cache.items():
+ self._event_auth_cache.set(event_id, auth_events)
+
+ for event_id, auth_event_id, auth_event_depth in found:
event_to_auth_events.setdefault(event_id, set()).add(auth_event_id)
sets = event_to_missing_sets.get(auth_event_id)
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 5e4af2eb..97b67548 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -92,6 +92,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
where_clause="NOT have_censored",
)
+ self.db_pool.updates.register_background_index_update(
+ "users_have_local_media",
+ index_name="users_have_local_media",
+ table="local_media_repository",
+ columns=["user_id", "created_ts"],
+ )
+
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6e7f16f3..4732685f 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -31,6 +31,7 @@ from synapse.api.room_versions import (
RoomVersions,
)
from synapse.events import EventBase, make_event_from_dict
+from synapse.events.snapshot import EventContext
from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.metrics.background_process_metrics import (
@@ -44,7 +45,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
-from synapse.types import Collection, get_domain_from_id
+from synapse.types import Collection, JsonDict, get_domain_from_id
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
@@ -525,6 +526,57 @@ class EventsWorkerStore(SQLBaseStore):
return event_map
+ async def get_stripped_room_state_from_event_context(
+ self,
+ context: EventContext,
+ state_types_to_include: List[EventTypes],
+ membership_user_id: Optional[str] = None,
+ ) -> List[JsonDict]:
+ """
+ Retrieve the stripped state from a room, given an event context to retrieve state
+ from as well as the state types to include. Optionally, include the membership
+ events from a specific user.
+
+ "Stripped" state means that only the `type`, `state_key`, `content` and `sender` keys
+ are included from each state event.
+
+ Args:
+ context: The event context to retrieve state of the room from.
+ state_types_to_include: The type of state events to include.
+ membership_user_id: An optional user ID to include the stripped membership state
+ events of. This is useful when generating the stripped state of a room for
+ invites. We want to send membership events of the inviter, so that the
+ invitee can display the inviter's profile information if the room lacks any.
+
+ Returns:
+ A list of dictionaries, each representing a stripped state event from the room.
+ """
+ current_state_ids = await context.get_current_state_ids()
+
+ # We know this event is not an outlier, so this must be
+ # non-None.
+ assert current_state_ids is not None
+
+ # The state to include
+ state_to_include_ids = [
+ e_id
+ for k, e_id in current_state_ids.items()
+ if k[0] in state_types_to_include
+ or (membership_user_id and k == (EventTypes.Member, membership_user_id))
+ ]
+
+ state_to_include = await self.get_events(state_to_include_ids)
+
+ return [
+ {
+ "type": e.type,
+ "state_key": e.state_key,
+ "content": e.content,
+ "sender": e.sender,
+ }
+ for e in state_to_include.values()
+ ]
+
def _do_fetch(self, conn):
"""Takes a database connection and waits for requests for events from
the _event_fetch_list queue.
@@ -1065,11 +1117,13 @@ class EventsWorkerStore(SQLBaseStore):
def get_all_new_forward_event_rows(txn):
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts, relates_to_id"
+ " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
+ " LEFT JOIN room_memberships USING (event_id)"
+ " LEFT JOIN rejections USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
@@ -1100,12 +1154,14 @@ class EventsWorkerStore(SQLBaseStore):
def get_ex_outlier_stream_rows_txn(txn):
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts, relates_to_id"
+ " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
+ " LEFT JOIN room_memberships USING (event_id)"
+ " LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index cc538c5c..4b2f2247 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -93,6 +93,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
+ self.server_name = hs.hostname
async def get_local_media(self, media_id: str) -> Optional[Dict[str, Any]]:
"""Get the metadata for a local piece of media
@@ -115,6 +116,109 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="get_local_media",
)
+ async def get_local_media_by_user_paginate(
+ self, start: int, limit: int, user_id: str
+ ) -> Tuple[List[Dict[str, Any]], int]:
+ """Get a paginated list of metadata for a local piece of media
+ which an user_id has uploaded
+
+ Args:
+ start: offset in the list
+ limit: maximum amount of media_ids to retrieve
+ user_id: fully-qualified user id
+ Returns:
+ A paginated list of all metadata of user's media,
+ plus the total count of all the user's media
+ """
+
+ def get_local_media_by_user_paginate_txn(txn):
+
+ args = [user_id]
+ sql = """
+ SELECT COUNT(*) as total_media
+ FROM local_media_repository
+ WHERE user_id = ?
+ """
+ txn.execute(sql, args)
+ count = txn.fetchone()[0]
+
+ sql = """
+ SELECT
+ "media_id",
+ "media_type",
+ "media_length",
+ "upload_name",
+ "created_ts",
+ "last_access_ts",
+ "quarantined_by",
+ "safe_from_quarantine"
+ FROM local_media_repository
+ WHERE user_id = ?
+ ORDER BY created_ts DESC, media_id DESC
+ LIMIT ? OFFSET ?
+ """
+
+ args += [limit, start]
+ txn.execute(sql, args)
+ media = self.db_pool.cursor_to_dict(txn)
+ return media, count
+
+ return await self.db_pool.runInteraction(
+ "get_local_media_by_user_paginate_txn", get_local_media_by_user_paginate_txn
+ )
+
+ async def get_local_media_before(
+ self, before_ts: int, size_gt: int, keep_profiles: bool,
+ ) -> Optional[List[str]]:
+
+ # to find files that have never been accessed (last_access_ts IS NULL)
+ # compare with `created_ts`
+ sql = """
+ SELECT media_id
+ FROM local_media_repository AS lmr
+ WHERE
+ ( last_access_ts < ?
+ OR ( created_ts < ? AND last_access_ts IS NULL ) )
+ AND media_length > ?
+ """
+
+ if keep_profiles:
+ sql_keep = """
+ AND (
+ NOT EXISTS
+ (SELECT 1
+ FROM profiles
+ WHERE profiles.avatar_url = '{media_prefix}' || lmr.media_id)
+ AND NOT EXISTS
+ (SELECT 1
+ FROM groups
+ WHERE groups.avatar_url = '{media_prefix}' || lmr.media_id)
+ AND NOT EXISTS
+ (SELECT 1
+ FROM room_memberships
+ WHERE room_memberships.avatar_url = '{media_prefix}' || lmr.media_id)
+ AND NOT EXISTS
+ (SELECT 1
+ FROM user_directory
+ WHERE user_directory.avatar_url = '{media_prefix}' || lmr.media_id)
+ AND NOT EXISTS
+ (SELECT 1
+ FROM room_stats_state
+ WHERE room_stats_state.avatar = '{media_prefix}' || lmr.media_id)
+ )
+ """.format(
+ media_prefix="mxc://%s/" % (self.server_name,),
+ )
+ sql += sql_keep
+
+ def _get_local_media_before_txn(txn):
+ txn.execute(sql, (before_ts, before_ts, size_gt))
+ return [row[0] for row in txn]
+
+ return await self.db_pool.runInteraction(
+ "get_local_media_before", _get_local_media_before_txn
+ )
+
async def store_local_media(
self,
media_id,
@@ -348,6 +452,33 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="get_remote_media_thumbnails",
)
+ async def get_remote_media_thumbnail(
+ self, origin: str, media_id: str, t_width: int, t_height: int, t_type: str,
+ ) -> Optional[Dict[str, Any]]:
+ """Fetch the thumbnail info of given width, height and type.
+ """
+
+ return await self.db_pool.simple_select_one(
+ table="remote_media_cache_thumbnails",
+ keyvalues={
+ "media_origin": origin,
+ "media_id": media_id,
+ "thumbnail_width": t_width,
+ "thumbnail_height": t_height,
+ "thumbnail_type": t_type,
+ },
+ retcols=(
+ "thumbnail_width",
+ "thumbnail_height",
+ "thumbnail_method",
+ "thumbnail_type",
+ "thumbnail_length",
+ "filesystem_id",
+ ),
+ allow_none=True,
+ desc="get_remote_media_thumbnail",
+ )
+
async def store_remote_media_thumbnail(
self,
origin,
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index a6d1eb90..0e25ca3d 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -39,7 +39,7 @@ class ProfileWorkerStore(SQLBaseStore):
avatar_url=profile["avatar_url"], display_name=profile["displayname"]
)
- async def get_profile_displayname(self, user_localpart: str) -> str:
+ async def get_profile_displayname(self, user_localpart: str) -> Optional[str]:
return await self.db_pool.simple_select_one_onecol(
table="profiles",
keyvalues={"user_id": user_localpart},
@@ -47,7 +47,7 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_profile_displayname",
)
- async def get_profile_avatar_url(self, user_localpart: str) -> str:
+ async def get_profile_avatar_url(self, user_localpart: str) -> Optional[str]:
return await self.db_pool.simple_select_one_onecol(
table="profiles",
keyvalues={"user_id": user_localpart},
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 4c843b76..e5d07ce7 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -16,29 +16,64 @@
# limitations under the License.
import logging
import re
-from typing import Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
+
+import attr
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
-from synapse.storage.types import Cursor
+from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.databases.main.stats import StatsStore
+from synapse.storage.types import Connection, Cursor
+from synapse.storage.util.id_generators import IdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import UserID
from synapse.util.caches.descriptors import cached
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
logger = logging.getLogger(__name__)
-class RegistrationWorkerStore(SQLBaseStore):
- def __init__(self, database: DatabasePool, db_conn, hs):
+@attr.s(frozen=True, slots=True)
+class TokenLookupResult:
+ """Result of looking up an access token.
+
+ Attributes:
+ user_id: The user that this token authenticates as
+ is_guest
+ shadow_banned
+ token_id: The ID of the access token looked up
+ device_id: The device associated with the token, if any.
+ valid_until_ms: The timestamp the token expires, if any.
+ token_owner: The "owner" of the token. This is either the same as the
+ user, or a server admin who is logged in as the user.
+ """
+
+ user_id = attr.ib(type=str)
+ is_guest = attr.ib(type=bool, default=False)
+ shadow_banned = attr.ib(type=bool, default=False)
+ token_id = attr.ib(type=Optional[int], default=None)
+ device_id = attr.ib(type=Optional[str], default=None)
+ valid_until_ms = attr.ib(type=Optional[int], default=None)
+ token_owner = attr.ib(type=str)
+
+ # Make the token owner default to the user ID, which is the common case.
+ @token_owner.default
+ def _default_token_owner(self):
+ return self.user_id
+
+
+class RegistrationWorkerStore(CacheInvalidationWorkerStore):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
self.config = hs.config
- self.clock = hs.get_clock()
# Note: we don't check this sequence for consistency as we'd have to
# call `find_max_generated_user_id_localpart` each time, which is
@@ -55,7 +90,7 @@ class RegistrationWorkerStore(SQLBaseStore):
# Create a background job for culling expired 3PID validity tokens
if hs.config.run_background_tasks:
- self.clock.looping_call(
+ self._clock.looping_call(
self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
)
@@ -92,21 +127,19 @@ class RegistrationWorkerStore(SQLBaseStore):
if not info:
return False
- now = self.clock.time_msec()
+ now = self._clock.time_msec()
trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000
is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
return is_trial
@cached()
- async def get_user_by_access_token(self, token: str) -> Optional[dict]:
+ async def get_user_by_access_token(self, token: str) -> Optional[TokenLookupResult]:
"""Get a user from the given access token.
Args:
token: The access token of a user.
Returns:
- None, if the token did not match, otherwise dict
- including the keys `name`, `is_guest`, `device_id`, `token_id`,
- `valid_until_ms`.
+ None, if the token did not match, otherwise a `TokenLookupResult`
"""
return await self.db_pool.runInteraction(
"get_user_by_access_token", self._query_for_auth, token
@@ -236,13 +269,13 @@ class RegistrationWorkerStore(SQLBaseStore):
desc="get_renewal_token_for_user",
)
- async def get_users_expiring_soon(self) -> List[Dict[str, int]]:
+ async def get_users_expiring_soon(self) -> List[Dict[str, Any]]:
"""Selects users whose account will expire in the [now, now + renew_at] time
window (see configuration for account_validity for information on what renew_at
refers to).
Returns:
- A list of dictionaries mapping user ID to expiration time (in milliseconds).
+ A list of dictionaries, each with a user ID and expiration time (in milliseconds).
"""
def select_users_txn(txn, now_ms, renew_at):
@@ -257,7 +290,7 @@ class RegistrationWorkerStore(SQLBaseStore):
return await self.db_pool.runInteraction(
"get_users_expiring_soon",
select_users_txn,
- self.clock.time_msec(),
+ self._clock.time_msec(),
self.config.account_validity.renew_at,
)
@@ -327,19 +360,24 @@ class RegistrationWorkerStore(SQLBaseStore):
await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
- def _query_for_auth(self, txn, token):
- sql = (
- "SELECT users.name, users.is_guest, users.shadow_banned, access_tokens.id as token_id,"
- " access_tokens.device_id, access_tokens.valid_until_ms"
- " FROM users"
- " INNER JOIN access_tokens on users.name = access_tokens.user_id"
- " WHERE token = ?"
- )
+ def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]:
+ sql = """
+ SELECT users.name as user_id,
+ users.is_guest,
+ users.shadow_banned,
+ access_tokens.id as token_id,
+ access_tokens.device_id,
+ access_tokens.valid_until_ms,
+ access_tokens.user_id as token_owner
+ FROM users
+ INNER JOIN access_tokens on users.name = COALESCE(puppets_user_id, access_tokens.user_id)
+ WHERE token = ?
+ """
txn.execute(sql, (token,))
rows = self.db_pool.cursor_to_dict(txn)
if rows:
- return rows[0]
+ return TokenLookupResult(**rows[0])
return None
@@ -803,7 +841,7 @@ class RegistrationWorkerStore(SQLBaseStore):
await self.db_pool.runInteraction(
"cull_expired_threepid_validation_tokens",
cull_expired_threepid_validation_tokens_txn,
- self.clock.time_msec(),
+ self._clock.time_msec(),
)
@wrap_as_background_process("account_validity_set_expiration_dates")
@@ -890,10 +928,10 @@ class RegistrationWorkerStore(SQLBaseStore):
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
- def __init__(self, database: DatabasePool, db_conn, hs):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
- self.clock = hs.get_clock()
+ self._clock = hs.get_clock()
self.config = hs.config
self.db_pool.updates.register_background_index_update(
@@ -1016,13 +1054,56 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
return 1
+ async def set_user_deactivated_status(
+ self, user_id: str, deactivated: bool
+ ) -> None:
+ """Set the `deactivated` property for the provided user to the provided value.
+
+ Args:
+ user_id: The ID of the user to set the status for.
+ deactivated: The value to set for `deactivated`.
+ """
-class RegistrationStore(RegistrationBackgroundUpdateStore):
- def __init__(self, database: DatabasePool, db_conn, hs):
+ await self.db_pool.runInteraction(
+ "set_user_deactivated_status",
+ self.set_user_deactivated_status_txn,
+ user_id,
+ deactivated,
+ )
+
+ def set_user_deactivated_status_txn(self, txn, user_id: str, deactivated: bool):
+ self.db_pool.simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,)
+ )
+ txn.call_after(self.is_guest.invalidate, (user_id,))
+
+ @cached()
+ async def is_guest(self, user_id: str) -> bool:
+ res = await self.db_pool.simple_select_one_onecol(
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="is_guest",
+ allow_none=True,
+ desc="is_guest",
+ )
+
+ return res if res else False
+
+
+class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
+ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
super().__init__(database, db_conn, hs)
self._ignore_unknown_session_error = hs.config.request_token_inhibit_3pid_errors
+ self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
+
async def add_access_token_to_user(
self,
user_id: str,
@@ -1138,19 +1219,19 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
def _register_user(
self,
txn,
- user_id,
- password_hash,
- was_guest,
- make_guest,
- appservice_id,
- create_profile_with_displayname,
- admin,
- user_type,
- shadow_banned,
+ user_id: str,
+ password_hash: Optional[str],
+ was_guest: bool,
+ make_guest: bool,
+ appservice_id: Optional[str],
+ create_profile_with_displayname: Optional[str],
+ admin: bool,
+ user_type: Optional[str],
+ shadow_banned: bool,
):
user_id_obj = UserID.from_string(user_id)
- now = int(self.clock.time())
+ now = int(self._clock.time())
try:
if was_guest:
@@ -1374,18 +1455,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
await self.db_pool.runInteraction("delete_access_token", f)
- @cached()
- async def is_guest(self, user_id: str) -> bool:
- res = await self.db_pool.simple_select_one_onecol(
- table="users",
- keyvalues={"name": user_id},
- retcol="is_guest",
- allow_none=True,
- desc="is_guest",
- )
-
- return res if res else False
-
async def add_user_pending_deactivation(self, user_id: str) -> None:
"""
Adds a user to the table of users who need to be parted from all the rooms they're
@@ -1479,7 +1548,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
txn,
table="threepid_validation_session",
keyvalues={"session_id": session_id},
- updatevalues={"validated_at": self.clock.time_msec()},
+ updatevalues={"validated_at": self._clock.time_msec()},
)
return next_link
@@ -1547,35 +1616,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
start_or_continue_validation_session_txn,
)
- async def set_user_deactivated_status(
- self, user_id: str, deactivated: bool
- ) -> None:
- """Set the `deactivated` property for the provided user to the provided value.
-
- Args:
- user_id: The ID of the user to set the status for.
- deactivated: The value to set for `deactivated`.
- """
-
- await self.db_pool.runInteraction(
- "set_user_deactivated_status",
- self.set_user_deactivated_status_txn,
- user_id,
- deactivated,
- )
-
- def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
- self.db_pool.simple_update_one_txn(
- txn=txn,
- table="users",
- keyvalues={"name": user_id},
- updatevalues={"deactivated": 1 if deactivated else 0},
- )
- self._invalidate_cache_and_stream(
- txn, self.get_user_deactivated_status, (user_id,)
- )
- txn.call_after(self.is_guest.invalidate, (user_id,))
-
def find_max_generated_user_id_localpart(cur: Cursor) -> int:
"""
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index e83d961c..dc0c4b54 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1411,6 +1411,65 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
desc="add_event_report",
)
+ async def get_event_report(self, report_id: int) -> Optional[Dict[str, Any]]:
+ """Retrieve an event report
+
+ Args:
+ report_id: ID of reported event in database
+ Returns:
+ event_report: json list of information from event report
+ """
+
+ def _get_event_report_txn(txn, report_id):
+
+ sql = """
+ SELECT
+ er.id,
+ er.received_ts,
+ er.room_id,
+ er.event_id,
+ er.user_id,
+ er.content,
+ events.sender,
+ room_stats_state.canonical_alias,
+ room_stats_state.name,
+ event_json.json AS event_json
+ FROM event_reports AS er
+ LEFT JOIN events
+ ON events.event_id = er.event_id
+ JOIN event_json
+ ON event_json.event_id = er.event_id
+ JOIN room_stats_state
+ ON room_stats_state.room_id = er.room_id
+ WHERE er.id = ?
+ """
+
+ txn.execute(sql, [report_id])
+ row = txn.fetchone()
+
+ if not row:
+ return None
+
+ event_report = {
+ "id": row[0],
+ "received_ts": row[1],
+ "room_id": row[2],
+ "event_id": row[3],
+ "user_id": row[4],
+ "score": db_to_json(row[5]).get("score"),
+ "reason": db_to_json(row[5]).get("reason"),
+ "sender": row[6],
+ "canonical_alias": row[7],
+ "name": row[8],
+ "event_json": db_to_json(row[9]),
+ }
+
+ return event_report
+
+ return await self.db_pool.runInteraction(
+ "get_event_report", _get_event_report_txn, report_id
+ )
+
async def get_event_reports_paginate(
self,
start: int,
@@ -1468,18 +1527,15 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
er.room_id,
er.event_id,
er.user_id,
- er.reason,
er.content,
events.sender,
- room_aliases.room_alias,
- event_json.json AS event_json
+ room_stats_state.canonical_alias,
+ room_stats_state.name
FROM event_reports AS er
- LEFT JOIN room_aliases
- ON room_aliases.room_id = er.room_id
- JOIN events
+ LEFT JOIN events
ON events.event_id = er.event_id
- JOIN event_json
- ON event_json.event_id = er.event_id
+ JOIN room_stats_state
+ ON room_stats_state.room_id = er.room_id
{where_clause}
ORDER BY er.received_ts {order}
LIMIT ?
@@ -1490,15 +1546,29 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
args += [limit, start]
txn.execute(sql, args)
- event_reports = self.db_pool.cursor_to_dict(txn)
-
- if count > 0:
- for row in event_reports:
- try:
- row["content"] = db_to_json(row["content"])
- row["event_json"] = db_to_json(row["event_json"])
- except Exception:
- continue
+
+ event_reports = []
+ for row in txn:
+ try:
+ s = db_to_json(row[5]).get("score")
+ r = db_to_json(row[5]).get("reason")
+ except Exception:
+ logger.error("Unable to parse json from event_reports: %s", row[0])
+ continue
+ event_reports.append(
+ {
+ "id": row[0],
+ "received_ts": row[1],
+ "room_id": row[2],
+ "event_id": row[3],
+ "user_id": row[4],
+ "score": s,
+ "reason": r,
+ "sender": row[6],
+ "canonical_alias": row[7],
+ "name": row[8],
+ }
+ )
return event_reports, count
diff --git a/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql b/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql
new file mode 100644
index 00000000..00a9431a
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Whether the access token is an admin token for controlling another user.
+ALTER TABLE access_tokens ADD COLUMN puppets_user_id TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql
new file mode 100644
index 00000000..a2842687
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql
@@ -0,0 +1,2 @@
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('users_have_local_media', '{}'); \ No newline at end of file
diff --git a/synapse/storage/databases/main/schema/delta/58/23e2e_cross_signing_keys_idx.sql b/synapse/storage/databases/main/schema/delta/58/23e2e_cross_signing_keys_idx.sql
new file mode 100644
index 00000000..61c558db
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/23e2e_cross_signing_keys_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('e2e_cross_signing_keys_idx', '{}');
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 5beb302b..0cdb3ec1 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -16,15 +16,18 @@
import logging
from collections import Counter
+from enum import Enum
from itertools import chain
from typing import Any, Dict, List, Optional, Tuple
from twisted.internet.defer import DeferredLock
from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import StoreError
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.engines import PostgresEngine
+from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
@@ -59,6 +62,23 @@ TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user
TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
+class UserSortOrder(Enum):
+ """
+ Enum to define the sorting method used when returning users
+ with get_users_media_usage_paginate
+
+ MEDIA_LENGTH = ordered by size of uploaded media. Smallest to largest.
+ MEDIA_COUNT = ordered by number of uploaded media. Smallest to largest.
+ USER_ID = ordered alphabetically by `user_id`.
+ DISPLAYNAME = ordered alphabetically by `displayname`
+ """
+
+ MEDIA_LENGTH = "media_length"
+ MEDIA_COUNT = "media_count"
+ USER_ID = "user_id"
+ DISPLAYNAME = "displayname"
+
+
class StatsStore(StateDeltasStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
@@ -882,3 +902,110 @@ class StatsStore(StateDeltasStore):
complete_with_stream_id=pos,
absolute_field_overrides={"joined_rooms": joined_rooms},
)
+
+ async def get_users_media_usage_paginate(
+ self,
+ start: int,
+ limit: int,
+ from_ts: Optional[int] = None,
+ until_ts: Optional[int] = None,
+ order_by: Optional[UserSortOrder] = UserSortOrder.USER_ID.value,
+ direction: Optional[str] = "f",
+ search_term: Optional[str] = None,
+ ) -> Tuple[List[JsonDict], Dict[str, int]]:
+ """Function to retrieve a paginated list of users and their uploaded local media
+ (size and number). This will return a json list of users and the
+ total number of users matching the filter criteria.
+
+ Args:
+ start: offset to begin the query from
+ limit: number of rows to retrieve
+ from_ts: request only media that are created later than this timestamp (ms)
+ until_ts: request only media that are created earlier than this timestamp (ms)
+ order_by: the sort order of the returned list
+ direction: sort ascending or descending
+ search_term: a string to filter user names by
+ Returns:
+ A list of user dicts and an integer representing the total number of
+ users that exist given this query
+ """
+
+ def get_users_media_usage_paginate_txn(txn):
+ filters = []
+ args = [self.hs.config.server_name]
+
+ if search_term:
+ filters.append("(lmr.user_id LIKE ? OR displayname LIKE ?)")
+ args.extend(["@%" + search_term + "%:%", "%" + search_term + "%"])
+
+ if from_ts:
+ filters.append("created_ts >= ?")
+ args.extend([from_ts])
+ if until_ts:
+ filters.append("created_ts <= ?")
+ args.extend([until_ts])
+
+ # Set ordering
+ if UserSortOrder(order_by) == UserSortOrder.MEDIA_LENGTH:
+ order_by_column = "media_length"
+ elif UserSortOrder(order_by) == UserSortOrder.MEDIA_COUNT:
+ order_by_column = "media_count"
+ elif UserSortOrder(order_by) == UserSortOrder.USER_ID:
+ order_by_column = "lmr.user_id"
+ elif UserSortOrder(order_by) == UserSortOrder.DISPLAYNAME:
+ order_by_column = "displayname"
+ else:
+ raise StoreError(
+ 500, "Incorrect value for order_by provided: %s" % order_by
+ )
+
+ if direction == "b":
+ order = "DESC"
+ else:
+ order = "ASC"
+
+ where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""
+
+ sql_base = """
+ FROM local_media_repository as lmr
+ LEFT JOIN profiles AS p ON lmr.user_id = '@' || p.user_id || ':' || ?
+ {}
+ GROUP BY lmr.user_id, displayname
+ """.format(
+ where_clause
+ )
+
+ # SQLite does not support SELECT COUNT(*) OVER()
+ sql = """
+ SELECT COUNT(*) FROM (
+ SELECT lmr.user_id
+ {sql_base}
+ ) AS count_user_ids
+ """.format(
+ sql_base=sql_base,
+ )
+ txn.execute(sql, args)
+ count = txn.fetchone()[0]
+
+ sql = """
+ SELECT
+ lmr.user_id,
+ displayname,
+ COUNT(lmr.user_id) as media_count,
+ SUM(media_length) as media_length
+ {sql_base}
+ ORDER BY {order_by_column} {order}
+ LIMIT ? OFFSET ?
+ """.format(
+ sql_base=sql_base, order_by_column=order_by_column, order=order,
+ )
+
+ args += [limit, start]
+ txn.execute(sql, args)
+ users = self.db_pool.cursor_to_dict(txn)
+
+ return users, count
+
+ return await self.db_pool.runInteraction(
+ "get_users_media_usage_paginate_txn", get_users_media_usage_paginate_txn
+ )
diff --git a/synapse/types.py b/synapse/types.py
index 5bde67cc..66bb5bac 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -29,6 +29,7 @@ from typing import (
Tuple,
Type,
TypeVar,
+ Union,
)
import attr
@@ -38,6 +39,7 @@ from unpaddedbase64 import decode_base64
from synapse.api.errors import Codes, SynapseError
if TYPE_CHECKING:
+ from synapse.appservice.api import ApplicationService
from synapse.storage.databases.main import DataStore
# define a version of typing.Collection that works on python 3.5
@@ -74,6 +76,7 @@ class Requester(
"shadow_banned",
"device_id",
"app_service",
+ "authenticated_entity",
],
)
):
@@ -104,6 +107,7 @@ class Requester(
"shadow_banned": self.shadow_banned,
"device_id": self.device_id,
"app_server_id": self.app_service.id if self.app_service else None,
+ "authenticated_entity": self.authenticated_entity,
}
@staticmethod
@@ -129,16 +133,18 @@ class Requester(
shadow_banned=input["shadow_banned"],
device_id=input["device_id"],
app_service=appservice,
+ authenticated_entity=input["authenticated_entity"],
)
def create_requester(
- user_id,
- access_token_id=None,
- is_guest=False,
- shadow_banned=False,
- device_id=None,
- app_service=None,
+ user_id: Union[str, "UserID"],
+ access_token_id: Optional[int] = None,
+ is_guest: Optional[bool] = False,
+ shadow_banned: Optional[bool] = False,
+ device_id: Optional[str] = None,
+ app_service: Optional["ApplicationService"] = None,
+ authenticated_entity: Optional[str] = None,
):
"""
Create a new ``Requester`` object
@@ -151,14 +157,27 @@ def create_requester(
shadow_banned (bool): True if the user making this request is shadow-banned.
device_id (str|None): device_id which was set at authentication time
app_service (ApplicationService|None): the AS requesting on behalf of the user
+ authenticated_entity: The entity that authenticated when making the request.
+ This is different to the user_id when an admin user or the server is
+ "puppeting" the user.
Returns:
Requester
"""
if not isinstance(user_id, UserID):
user_id = UserID.from_string(user_id)
+
+ if authenticated_entity is None:
+ authenticated_entity = user_id.to_string()
+
return Requester(
- user_id, access_token_id, is_guest, shadow_banned, device_id, app_service
+ user_id,
+ access_token_id,
+ is_guest,
+ shadow_banned,
+ device_id,
+ app_service,
+ authenticated_entity,
)
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 5d7fffee..a924140c 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -13,10 +13,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import enum
import functools
import inspect
import logging
-from typing import Any, Callable, Generic, Optional, Tuple, TypeVar, Union, cast
+from typing import (
+ Any,
+ Callable,
+ Generic,
+ Iterable,
+ Mapping,
+ Optional,
+ Sequence,
+ Tuple,
+ TypeVar,
+ Union,
+ cast,
+)
from weakref import WeakValueDictionary
from twisted.internet import defer
@@ -24,6 +37,7 @@ from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.util import unwrapFirstError
from synapse.util.caches.deferred_cache import DeferredCache
+from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
@@ -48,7 +62,7 @@ class _CachedFunction(Generic[F]):
class _CacheDescriptorBase:
- def __init__(self, orig: _CachedFunction, num_args, cache_context=False):
+ def __init__(self, orig: Callable[..., Any], num_args, cache_context=False):
self.orig = orig
arg_spec = inspect.getfullargspec(orig)
@@ -97,8 +111,107 @@ class _CacheDescriptorBase:
self.add_cache_context = cache_context
+ self.cache_key_builder = get_cache_key_builder(
+ self.arg_names, self.arg_defaults
+ )
+
+
+class _LruCachedFunction(Generic[F]):
+ cache = None # type: LruCache[CacheKey, Any]
+ __call__ = None # type: F
+
+
+def lru_cache(
+ max_entries: int = 1000, cache_context: bool = False,
+) -> Callable[[F], _LruCachedFunction[F]]:
+ """A method decorator that applies a memoizing cache around the function.
+
+ This is more-or-less a drop-in equivalent to functools.lru_cache, although note
+ that the signature is slightly different.
+
+ The main differences with functools.lru_cache are:
+ (a) the size of the cache can be controlled via the cache_factor mechanism
+ (b) the wrapped function can request a "cache_context" which provides a
+ callback mechanism to indicate that the result is no longer valid
+ (c) prometheus metrics are exposed automatically.
+
+ The function should take zero or more arguments, which are used as the key for the
+ cache. Single-argument functions use that argument as the cache key; otherwise the
+ arguments are built into a tuple.
+
+ Cached functions can be "chained" (i.e. a cached function can call other cached
+ functions and get appropriately invalidated when they called caches are
+ invalidated) by adding a special "cache_context" argument to the function
+ and passing that as a kwarg to all caches called. For example:
+
+ @lru_cache(cache_context=True)
+ def foo(self, key, cache_context):
+ r1 = self.bar1(key, on_invalidate=cache_context.invalidate)
+ r2 = self.bar2(key, on_invalidate=cache_context.invalidate)
+ return r1 + r2
+
+ The wrapped function also has a 'cache' property which offers direct access to the
+ underlying LruCache.
+ """
+
+ def func(orig: F) -> _LruCachedFunction[F]:
+ desc = LruCacheDescriptor(
+ orig, max_entries=max_entries, cache_context=cache_context,
+ )
+ return cast(_LruCachedFunction[F], desc)
+
+ return func
+
+
+class LruCacheDescriptor(_CacheDescriptorBase):
+ """Helper for @lru_cache"""
+
+ class _Sentinel(enum.Enum):
+ sentinel = object()
+
+ def __init__(
+ self, orig, max_entries: int = 1000, cache_context: bool = False,
+ ):
+ super().__init__(orig, num_args=None, cache_context=cache_context)
+ self.max_entries = max_entries
+
+ def __get__(self, obj, owner):
+ cache = LruCache(
+ cache_name=self.orig.__name__, max_size=self.max_entries,
+ ) # type: LruCache[CacheKey, Any]
+
+ get_cache_key = self.cache_key_builder
+ sentinel = LruCacheDescriptor._Sentinel.sentinel
+
+ @functools.wraps(self.orig)
+ def _wrapped(*args, **kwargs):
+ invalidate_callback = kwargs.pop("on_invalidate", None)
+ callbacks = (invalidate_callback,) if invalidate_callback else ()
+
+ cache_key = get_cache_key(args, kwargs)
-class CacheDescriptor(_CacheDescriptorBase):
+ ret = cache.get(cache_key, default=sentinel, callbacks=callbacks)
+ if ret != sentinel:
+ return ret
+
+ # Add our own `cache_context` to argument list if the wrapped function
+ # has asked for one
+ if self.add_cache_context:
+ kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key)
+
+ ret2 = self.orig(obj, *args, **kwargs)
+ cache.set(cache_key, ret2, callbacks=callbacks)
+
+ return ret2
+
+ wrapped = cast(_CachedFunction, _wrapped)
+ wrapped.cache = cache
+ obj.__dict__[self.orig.__name__] = wrapped
+
+ return wrapped
+
+
+class DeferredCacheDescriptor(_CacheDescriptorBase):
""" A method decorator that applies a memoizing cache around the function.
This caches deferreds, rather than the results themselves. Deferreds that
@@ -141,7 +254,6 @@ class CacheDescriptor(_CacheDescriptorBase):
cache_context=False,
iterable=False,
):
-
super().__init__(orig, num_args=num_args, cache_context=cache_context)
self.max_entries = max_entries
@@ -157,41 +269,7 @@ class CacheDescriptor(_CacheDescriptorBase):
iterable=self.iterable,
) # type: DeferredCache[CacheKey, Any]
- def get_cache_key_gen(args, kwargs):
- """Given some args/kwargs return a generator that resolves into
- the cache_key.
-
- We loop through each arg name, looking up if its in the `kwargs`,
- otherwise using the next argument in `args`. If there are no more
- args then we try looking the arg name up in the defaults
- """
- pos = 0
- for nm in self.arg_names:
- if nm in kwargs:
- yield kwargs[nm]
- elif pos < len(args):
- yield args[pos]
- pos += 1
- else:
- yield self.arg_defaults[nm]
-
- # By default our cache key is a tuple, but if there is only one item
- # then don't bother wrapping in a tuple. This is to save memory.
- if self.num_args == 1:
- nm = self.arg_names[0]
-
- def get_cache_key(args, kwargs):
- if nm in kwargs:
- return kwargs[nm]
- elif len(args):
- return args[0]
- else:
- return self.arg_defaults[nm]
-
- else:
-
- def get_cache_key(args, kwargs):
- return tuple(get_cache_key_gen(args, kwargs))
+ get_cache_key = self.cache_key_builder
@functools.wraps(self.orig)
def _wrapped(*args, **kwargs):
@@ -223,7 +301,6 @@ class CacheDescriptor(_CacheDescriptorBase):
wrapped.prefill = lambda key, val: cache.prefill(key[0], val)
else:
wrapped.invalidate = cache.invalidate
- wrapped.invalidate_all = cache.invalidate_all
wrapped.invalidate_many = cache.invalidate_many
wrapped.prefill = cache.prefill
@@ -236,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
return wrapped
-class CacheListDescriptor(_CacheDescriptorBase):
+class DeferredCacheListDescriptor(_CacheDescriptorBase):
"""Wraps an existing cache to support bulk fetching of keys.
Given a list of keys it looks in the cache to find any hits, then passes
@@ -382,11 +459,13 @@ class _CacheContext:
on a lower level.
"""
+ Cache = Union[DeferredCache, LruCache]
+
_cache_context_objects = (
WeakValueDictionary()
- ) # type: WeakValueDictionary[Tuple[DeferredCache, CacheKey], _CacheContext]
+ ) # type: WeakValueDictionary[Tuple[_CacheContext.Cache, CacheKey], _CacheContext]
- def __init__(self, cache, cache_key): # type: (DeferredCache, CacheKey) -> None
+ def __init__(self, cache: "_CacheContext.Cache", cache_key: CacheKey) -> None:
self._cache = cache
self._cache_key = cache_key
@@ -396,8 +475,8 @@ class _CacheContext:
@classmethod
def get_instance(
- cls, cache, cache_key
- ): # type: (DeferredCache, CacheKey) -> _CacheContext
+ cls, cache: "_CacheContext.Cache", cache_key: CacheKey
+ ) -> "_CacheContext":
"""Returns an instance constructed with the given arguments.
A new instance is only created if none already exists.
@@ -418,7 +497,7 @@ def cached(
cache_context: bool = False,
iterable: bool = False,
) -> Callable[[F], _CachedFunction[F]]:
- func = lambda orig: CacheDescriptor(
+ func = lambda orig: DeferredCacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
@@ -460,7 +539,7 @@ def cachedList(
def batch_do_something(self, first_arg, second_args):
...
"""
- func = lambda orig: CacheListDescriptor(
+ func = lambda orig: DeferredCacheListDescriptor(
orig,
cached_method_name=cached_method_name,
list_name=list_name,
@@ -468,3 +547,65 @@ def cachedList(
)
return cast(Callable[[F], _CachedFunction[F]], func)
+
+
+def get_cache_key_builder(
+ param_names: Sequence[str], param_defaults: Mapping[str, Any]
+) -> Callable[[Sequence[Any], Mapping[str, Any]], CacheKey]:
+ """Construct a function which will build cache keys suitable for a cached function
+
+ Args:
+ param_names: list of formal parameter names for the cached function
+ param_defaults: a mapping from parameter name to default value for that param
+
+ Returns:
+ A function which will take an (args, kwargs) pair and return a cache key
+ """
+
+ # By default our cache key is a tuple, but if there is only one item
+ # then don't bother wrapping in a tuple. This is to save memory.
+
+ if len(param_names) == 1:
+ nm = param_names[0]
+
+ def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey:
+ if nm in kwargs:
+ return kwargs[nm]
+ elif len(args):
+ return args[0]
+ else:
+ return param_defaults[nm]
+
+ else:
+
+ def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey:
+ return tuple(_get_cache_key_gen(param_names, param_defaults, args, kwargs))
+
+ return get_cache_key
+
+
+def _get_cache_key_gen(
+ param_names: Iterable[str],
+ param_defaults: Mapping[str, Any],
+ args: Sequence[Any],
+ kwargs: Mapping[str, Any],
+) -> Iterable[Any]:
+ """Given some args/kwargs return a generator that resolves into
+ the cache_key.
+
+ This is essentially the same operation as `inspect.getcallargs`, but optimised so
+ that we don't need to inspect the target function for each call.
+ """
+
+ # We loop through each arg name, looking up if its in the `kwargs`,
+ # otherwise using the next argument in `args`. If there are no more
+ # args then we try looking the arg name up in the defaults.
+ pos = 0
+ for nm in param_names:
+ if nm in kwargs:
+ yield kwargs[nm]
+ elif pos < len(args):
+ yield args[pos]
+ pos += 1
+ else:
+ yield param_defaults[nm]
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index a5cc9d05..4ab379e4 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -110,7 +110,7 @@ async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **k
failure_ts,
retry_interval,
backoff_on_failure=backoff_on_failure,
- **kwargs
+ **kwargs,
)