summaryrefslogtreecommitdiff
path: root/synapse/storage
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2021-05-11 16:41:38 +0200
committerAndrej Shadura <andrewsh@debian.org>2021-05-11 16:41:38 +0200
commit8b13141dd36d5784fe820afa06aa21a0c5735155 (patch)
tree12ac731328b325992ab80b1c8cecf4ed79f7935b /synapse/storage
parent55e3da6770137d9858028903525a99764b39e8b3 (diff)
New upstream version 1.33.2
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py1
-rw-r--r--synapse/storage/_base.py7
-rw-r--r--synapse/storage/background_updates.py1
-rw-r--r--synapse/storage/database.py109
-rw-r--r--synapse/storage/databases/__init__.py1
-rw-r--r--synapse/storage/databases/main/__init__.py74
-rw-r--r--synapse/storage/databases/main/account_data.py1
-rw-r--r--synapse/storage/databases/main/appservice.py1
-rw-r--r--synapse/storage/databases/main/cache.py1
-rw-r--r--synapse/storage/databases/main/censor_events.py1
-rw-r--r--synapse/storage/databases/main/client_ips.py2
-rw-r--r--synapse/storage/databases/main/deviceinbox.py1
-rw-r--r--synapse/storage/databases/main/devices.py26
-rw-r--r--synapse/storage/databases/main/directory.py1
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py1
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py1
-rw-r--r--synapse/storage/databases/main/event_federation.py4
-rw-r--r--synapse/storage/databases/main/event_push_actions.py1
-rw-r--r--synapse/storage/databases/main/events.py72
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py2
-rw-r--r--synapse/storage/databases/main/events_forward_extremities.py1
-rw-r--r--synapse/storage/databases/main/events_worker.py16
-rw-r--r--synapse/storage/databases/main/filtering.py1
-rw-r--r--synapse/storage/databases/main/group_server.py9
-rw-r--r--synapse/storage/databases/main/keys.py1
-rw-r--r--synapse/storage/databases/main/media_repository.py22
-rw-r--r--synapse/storage/databases/main/metrics.py1
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py1
-rw-r--r--synapse/storage/databases/main/presence.py93
-rw-r--r--synapse/storage/databases/main/profile.py1
-rw-r--r--synapse/storage/databases/main/purge_events.py1
-rw-r--r--synapse/storage/databases/main/push_rule.py1
-rw-r--r--synapse/storage/databases/main/pusher.py1
-rw-r--r--synapse/storage/databases/main/receipts.py1
-rw-r--r--synapse/storage/databases/main/registration.py63
-rw-r--r--synapse/storage/databases/main/rejections.py1
-rw-r--r--synapse/storage/databases/main/relations.py1
-rw-r--r--synapse/storage/databases/main/room.py65
-rw-r--r--synapse/storage/databases/main/roommember.py201
-rw-r--r--synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py1
-rw-r--r--synapse/storage/databases/main/schema/delta/57/local_current_membership.py1
-rw-r--r--synapse/storage/databases/main/schema/delta/59/11drop_thumbnail_constraint.sql.postgres22
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres20
-rw-r--r--synapse/storage/databases/main/search.py4
-rw-r--r--synapse/storage/databases/main/signatures.py1
-rw-r--r--synapse/storage/databases/main/state.py7
-rw-r--r--synapse/storage/databases/main/state_deltas.py1
-rw-r--r--synapse/storage/databases/main/stats.py26
-rw-r--r--synapse/storage/databases/main/stream.py5
-rw-r--r--synapse/storage/databases/main/tags.py1
-rw-r--r--synapse/storage/databases/main/transactions.py1
-rw-r--r--synapse/storage/databases/main/ui_auth.py1
-rw-r--r--synapse/storage/databases/main/user_directory.py1
-rw-r--r--synapse/storage/databases/main/user_erasure_store.py1
-rw-r--r--synapse/storage/databases/state/__init__.py1
-rw-r--r--synapse/storage/databases/state/bg_updates.py6
-rw-r--r--synapse/storage/databases/state/store.py6
-rw-r--r--synapse/storage/engines/__init__.py1
-rw-r--r--synapse/storage/engines/_base.py9
-rw-r--r--synapse/storage/engines/postgres.py12
-rw-r--r--synapse/storage/engines/sqlite.py16
-rw-r--r--synapse/storage/keys.py1
-rw-r--r--synapse/storage/persist_events.py4
-rw-r--r--synapse/storage/prepare_database.py15
-rw-r--r--synapse/storage/purge_events.py1
-rw-r--r--synapse/storage/push_rule.py1
-rw-r--r--synapse/storage/relations.py1
-rw-r--r--synapse/storage/roommember.py1
-rw-r--r--synapse/storage/state.py27
-rw-r--r--synapse/storage/types.py1
-rw-r--r--synapse/storage/util/__init__.py1
-rw-r--r--synapse/storage/util/id_generators.py12
-rw-r--r--synapse/storage/util/sequence.py1
75 files changed, 626 insertions, 409 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0b9007e5..105e4e1f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018,2019 New Vector Ltd
#
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 24090532..6b68d872 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017-2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
@@ -17,13 +16,13 @@
import logging
import random
from abc import ABCMeta
-from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
+from typing import TYPE_CHECKING, Any, Collection, Iterable, Optional, Union
from synapse.storage.database import LoggingTransaction # noqa: F401
from synapse.storage.database import make_in_list_sql_clause # noqa: F401
from synapse.storage.database import DatabasePool
from synapse.storage.types import Connection
-from synapse.types import Collection, StreamToken, get_domain_from_id
+from synapse.types import StreamToken, get_domain_from_id
from synapse.util import json_decoder
if TYPE_CHECKING:
@@ -115,7 +114,7 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
db_content = db_content.tobytes()
# Decode it to a Unicode string before feeding it to the JSON decoder, since
- # Python 3.5 does not support deserializing bytes.
+ # it only supports handling strings
if isinstance(db_content, (bytes, bytearray)):
db_content = db_content.decode("utf8")
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index ccb06aab..142787fd 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 94590e7b..bd39c095 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017-2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
@@ -21,6 +20,7 @@ from time import monotonic as monotonic_time
from typing import (
Any,
Callable,
+ Collection,
Dict,
Iterable,
Iterator,
@@ -49,7 +49,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Connection, Cursor
-from synapse.types import Collection
# python 3 does not have a maximum int value
MAX_TXN_ID = 2 ** 63 - 1
@@ -172,10 +171,7 @@ class LoggingDatabaseConnection:
# The type of entry which goes on our after_callbacks and exception_callbacks lists.
-#
-# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so
-# that mypy sees the type but the runtime python doesn't.
-_CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]]
+_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]]
R = TypeVar("R")
@@ -222,7 +218,7 @@ class LoggingTransaction:
self.after_callbacks = after_callbacks
self.exception_callbacks = exception_callbacks
- def call_after(self, callback: "Callable[..., None]", *args: Any, **kwargs: Any):
+ def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any):
"""Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the
correct thread.
@@ -234,7 +230,7 @@ class LoggingTransaction:
self.after_callbacks.append((callback, args, kwargs))
def call_on_exception(
- self, callback: "Callable[..., None]", *args: Any, **kwargs: Any
+ self, callback: Callable[..., None], *args: Any, **kwargs: Any
):
# if self.exception_callbacks is None, that means that whatever constructed the
# LoggingTransaction isn't expecting there to be any callbacks; assert that
@@ -486,9 +482,9 @@ class DatabasePool:
desc: str,
after_callbacks: List[_CallbackListEntry],
exception_callbacks: List[_CallbackListEntry],
- func: "Callable[..., R]",
+ func: Callable[..., R],
*args: Any,
- **kwargs: Any
+ **kwargs: Any,
) -> R:
"""Start a new database transaction with the given connection.
@@ -619,10 +615,10 @@ class DatabasePool:
async def runInteraction(
self,
desc: str,
- func: "Callable[..., R]",
+ func: Callable[..., R],
*args: Any,
db_autocommit: bool = False,
- **kwargs: Any
+ **kwargs: Any,
) -> R:
"""Starts a transaction on the database and runs a given function
@@ -679,10 +675,10 @@ class DatabasePool:
async def runWithConnection(
self,
- func: "Callable[..., R]",
+ func: Callable[..., R],
*args: Any,
db_autocommit: bool = False,
- **kwargs: Any
+ **kwargs: Any,
) -> R:
"""Wraps the .runWithConnection() method on the underlying db_pool.
@@ -775,7 +771,7 @@ class DatabasePool:
desc: str,
decoder: Optional[Callable[[Cursor], R]],
query: str,
- *args: Any
+ *args: Any,
) -> R:
"""Runs a single query for a result set.
@@ -900,7 +896,7 @@ class DatabasePool:
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
- insertion_values: Dict[str, Any] = {},
+ insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert",
lock: bool = True,
) -> Optional[bool]:
@@ -927,6 +923,8 @@ class DatabasePool:
Native upserts always return None. Emulated upserts return True if a
new entry was created, False if an existing one was updated.
"""
+ insertion_values = insertion_values or {}
+
attempts = 0
while True:
try:
@@ -964,7 +962,7 @@ class DatabasePool:
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
- insertion_values: Dict[str, Any] = {},
+ insertion_values: Optional[Dict[str, Any]] = None,
lock: bool = True,
) -> Optional[bool]:
"""
@@ -982,6 +980,8 @@ class DatabasePool:
Native upserts always return None. Emulated upserts return True if a
new entry was created, False if an existing one was updated.
"""
+ insertion_values = insertion_values or {}
+
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
@@ -1003,7 +1003,7 @@ class DatabasePool:
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
- insertion_values: Dict[str, Any] = {},
+ insertion_values: Optional[Dict[str, Any]] = None,
lock: bool = True,
) -> bool:
"""
@@ -1017,6 +1017,8 @@ class DatabasePool:
Returns True if a new entry was created, False if an existing
one was updated.
"""
+ insertion_values = insertion_values or {}
+
# We need to lock the table :(, unless we're *really* careful
if lock:
self.engine.lock_table(txn, table)
@@ -1077,7 +1079,7 @@ class DatabasePool:
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
- insertion_values: Dict[str, Any] = {},
+ insertion_values: Optional[Dict[str, Any]] = None,
) -> None:
"""
Use the native UPSERT functionality in recent PostgreSQL versions.
@@ -1090,7 +1092,7 @@ class DatabasePool:
"""
allvalues = {} # type: Dict[str, Any]
allvalues.update(keyvalues)
- allvalues.update(insertion_values)
+ allvalues.update(insertion_values or {})
if not values:
latter = "NOTHING"
@@ -1513,7 +1515,7 @@ class DatabasePool:
column: str,
iterable: Iterable[Any],
retcols: Iterable[str],
- keyvalues: Dict[str, Any] = {},
+ keyvalues: Optional[Dict[str, Any]] = None,
desc: str = "simple_select_many_batch",
batch_size: int = 100,
) -> List[Any]:
@@ -1531,6 +1533,8 @@ class DatabasePool:
desc: description of the transaction, for logging and metrics
batch_size: the number of rows for each select query
"""
+ keyvalues = keyvalues or {}
+
results = [] # type: List[Dict[str, Any]]
if not iterable:
@@ -2059,69 +2063,18 @@ def make_in_list_sql_clause(
KV = TypeVar("KV")
-def make_tuple_comparison_clause(
- database_engine: BaseDatabaseEngine, keys: List[Tuple[str, KV]]
-) -> Tuple[str, List[KV]]:
+def make_tuple_comparison_clause(keys: List[Tuple[str, KV]]) -> Tuple[str, List[KV]]:
"""Returns a tuple comparison SQL clause
- Depending what the SQL engine supports, builds a SQL clause that looks like either
- "(a, b) > (?, ?)", or "(a > ?) OR (a == ? AND b > ?)".
+ Builds a SQL clause that looks like "(a, b) > (?, ?)"
Args:
- database_engine
keys: A set of (column, value) pairs to be compared.
Returns:
A tuple of SQL query and the args
"""
- if database_engine.supports_tuple_comparison:
- return (
- "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)),
- [k[1] for k in keys],
- )
-
- # we want to build a clause
- # (a > ?) OR
- # (a == ? AND b > ?) OR
- # (a == ? AND b == ? AND c > ?)
- # ...
- # (a == ? AND b == ? AND ... AND z > ?)
- #
- # or, equivalently:
- #
- # (a > ? OR (a == ? AND
- # (b > ? OR (b == ? AND
- # ...
- # (y > ? OR (y == ? AND
- # z > ?
- # ))
- # ...
- # ))
- # ))
- #
- # which itself is equivalent to (and apparently easier for the query optimiser):
- #
- # (a >= ? AND (a > ? OR
- # (b >= ? AND (b > ? OR
- # ...
- # (y >= ? AND (y > ? OR
- # z > ?
- # ))
- # ...
- # ))
- # ))
- #
- #
-
- clause = ""
- args = [] # type: List[KV]
- for k, v in keys[:-1]:
- clause = clause + "(%s >= ? AND (%s > ? OR " % (k, k)
- args.extend([v, v])
-
- (k, v) = keys[-1]
- clause += "%s > ?" % (k,)
- args.append(v)
-
- clause += "))" * (len(keys) - 1)
- return clause, args
+ return (
+ "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)),
+ [k[1] for k in keys],
+ )
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 379c78bb..20b75505 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 1d44c3aa..49c7606d 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
@@ -18,9 +17,9 @@
import logging
from typing import List, Optional, Tuple
-from synapse.api.constants import PresenceState
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import DatabasePool
+from synapse.storage.databases.main.stats import UserSortOrder
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
IdGenerator,
@@ -51,7 +50,7 @@ from .media_repository import MediaRepositoryStore
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
from .openid import OpenIdStore
-from .presence import PresenceStore, UserPresenceState
+from .presence import PresenceStore
from .profile import ProfileStore
from .purge_events import PurgeEventsStore
from .push_rule import PushRuleStore
@@ -126,9 +125,6 @@ class DataStore(
self._clock = hs.get_clock()
self.database_engine = database.engine
- self._presence_id_gen = StreamIdGenerator(
- db_conn, "presence_stream", "stream_id"
- )
self._public_room_id_gen = StreamIdGenerator(
db_conn, "public_room_list_stream", "stream_id"
)
@@ -177,21 +173,6 @@ class DataStore(
super().__init__(database, db_conn, hs)
- self._presence_on_startup = self._get_active_presence(db_conn)
-
- presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
- db_conn,
- "presence_stream",
- entity_column="user_id",
- stream_column="stream_id",
- max_value=self._presence_id_gen.get_current_token(),
- )
- self.presence_stream_cache = StreamChangeCache(
- "PresenceStreamChangeCache",
- min_presence_val,
- prefilled_cache=presence_cache_prefill,
- )
-
device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
"DeviceListStreamChangeCache", device_list_max
@@ -238,32 +219,6 @@ class DataStore(
def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token()
- def take_presence_startup_info(self):
- active_on_startup = self._presence_on_startup
- self._presence_on_startup = None
- return active_on_startup
-
- def _get_active_presence(self, db_conn):
- """Fetch non-offline presence from the database so that we can register
- the appropriate time outs.
- """
-
- sql = (
- "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
- " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
- " WHERE state != ?"
- )
-
- txn = db_conn.cursor()
- txn.execute(sql, (PresenceState.OFFLINE,))
- rows = self.db_pool.cursor_to_dict(txn)
- txn.close()
-
- for row in rows:
- row["currently_active"] = bool(row["currently_active"])
-
- return [UserPresenceState(**row) for row in rows]
-
async def get_users(self) -> List[JsonDict]:
"""Function to retrieve a list of users in users table.
@@ -292,6 +247,8 @@ class DataStore(
name: Optional[str] = None,
guests: bool = True,
deactivated: bool = False,
+ order_by: UserSortOrder = UserSortOrder.USER_ID.value,
+ direction: str = "f",
) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users and the
@@ -304,6 +261,8 @@ class DataStore(
name: search for local part of user_id or display name
guests: whether to in include guest users
deactivated: whether to include deactivated users
+ order_by: the sort order of the returned list
+ direction: sort ascending or descending
Returns:
A tuple of a list of mappings from user to information and a count of total users.
"""
@@ -312,6 +271,14 @@ class DataStore(
filters = []
args = [self.hs.config.server_name]
+ # Set ordering
+ order_by_column = UserSortOrder(order_by).value
+
+ if direction == "b":
+ order = "DESC"
+ else:
+ order = "ASC"
+
# `name` is in database already in lower case
if name:
filters.append("(name LIKE ? OR LOWER(displayname) LIKE ?)")
@@ -339,10 +306,15 @@ class DataStore(
txn.execute(sql, args)
count = txn.fetchone()[0]
- sql = (
- "SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url "
- + sql_base
- + " ORDER BY u.name LIMIT ? OFFSET ?"
+ sql = """
+ SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url
+ {sql_base}
+ ORDER BY {order_by_column} {order}, u.name ASC
+ LIMIT ? OFFSET ?
+ """.format(
+ sql_base=sql_base,
+ order_by_column=order_by_column,
+ order=order,
)
args += [limit, start]
txn.execute(sql, args)
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index a277a1ef..1d02795f 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 85bb853d..9f182c2a 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 1e7637a6..ecc1f935 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py
index 3e26d5ba..f22c1f24 100644
--- a/synapse/storage/databases/main/censor_events.py
+++ b/synapse/storage/databases/main/censor_events.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 6d18e692..d60010e9 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -298,7 +297,6 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
# times, which is fine.
where_clause, where_args = make_tuple_comparison_clause(
- self.database_engine,
[("user_id", last_user_id), ("device_id", last_device_id)],
)
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 691080ce..7c9d1f74 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index d327e9aa..c9346de3 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
# Copyright 2019,2020 The Matrix.org Foundation C.I.C.
@@ -16,7 +15,7 @@
# limitations under the License.
import abc
import logging
-from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
+from typing import Any, Collection, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api.errors import Codes, StoreError
from synapse.logging.opentracing import (
@@ -32,7 +31,7 @@ from synapse.storage.database import (
LoggingTransaction,
make_tuple_comparison_clause,
)
-from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key
+from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
@@ -718,7 +717,15 @@ class DeviceWorkerStore(SQLBaseStore):
keyvalues={"user_id": user_id},
values={},
insertion_values={"added_ts": self._clock.time_msec()},
- desc="make_remote_user_device_cache_as_stale",
+ desc="mark_remote_user_device_cache_as_stale",
+ )
+
+ async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None:
+ # Remove the database entry that says we need to resync devices, after a resync
+ await self.db_pool.simple_delete(
+ table="device_lists_remote_resync",
+ keyvalues={"user_id": user_id},
+ desc="mark_remote_user_device_cache_as_valid",
)
async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
@@ -985,7 +992,7 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
def _txn(txn):
clause, args = make_tuple_comparison_clause(
- self.db_pool.engine, [(x, last_row[x]) for x in KEY_COLS]
+ [(x, last_row[x]) for x in KEY_COLS]
)
sql = """
SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
@@ -1290,15 +1297,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
lock=False,
)
- # If we're replacing the remote user's device list cache presumably
- # we've done a full resync, so we remove the entry that says we need
- # to resync
- self.db_pool.simple_delete_txn(
- txn,
- table="device_lists_remote_resync",
- keyvalues={"user_id": user_id},
- )
-
async def add_device_change_to_streams(
self, user_id: str, device_ids: Collection[str], hosts: List[str]
):
diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py
index 267b9483..86075bc5 100644
--- a/synapse/storage/databases/main/directory.py
+++ b/synapse/storage/databases/main/directory.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index 12ceccee..b15fb71e 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
# Copyright 2019 Matrix.org Foundation C.I.C.
#
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index f1e7859d..88afe97c 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
# Copyright 2019,2020 The Matrix.org Foundation C.I.C.
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index a956be49..ff81d5cd 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,7 +14,7 @@
import itertools
import logging
from queue import Empty, PriorityQueue
-from typing import Dict, Iterable, List, Set, Tuple
+from typing import Collection, Dict, Iterable, List, Set, Tuple
from synapse.api.errors import StoreError
from synapse.events import EventBase
@@ -26,7 +25,6 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
-from synapse.types import Collection
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 78245ad5..58453221 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 98dac19a..fd25c811 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
@@ -171,7 +170,7 @@ class PersistEventsStore:
)
async with stream_ordering_manager as stream_orderings:
- for (event, context), stream in zip(events_and_contexts, stream_orderings):
+ for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
await self.db_pool.runInteraction(
@@ -298,7 +297,7 @@ class PersistEventsStore:
txn.execute(sql + clause, args)
to_recursively_check = []
- for event_id, prev_event_id, metadata, rejected in txn:
+ for _, prev_event_id, metadata, rejected in txn:
if prev_event_id in existing_prevs:
continue
@@ -320,8 +319,8 @@ class PersistEventsStore:
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
- state_delta_for_room: Dict[str, DeltaState] = {},
- new_forward_extremeties: Dict[str, List[str]] = {},
+ state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
+ new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
):
"""Insert some number of room events into the necessary database tables.
@@ -342,6 +341,9 @@ class PersistEventsStore:
extremities.
"""
+ state_delta_for_room = state_delta_for_room or {}
+ new_forward_extremeties = new_forward_extremeties or {}
+
all_events_and_contexts = events_and_contexts
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
@@ -1125,7 +1127,7 @@ class PersistEventsStore:
def _update_forward_extremities_txn(
self, txn, new_forward_extremities, max_stream_order
):
- for room_id, new_extrem in new_forward_extremities.items():
+ for room_id in new_forward_extremities.keys():
self.db_pool.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
@@ -1376,24 +1378,28 @@ class PersistEventsStore:
],
)
- for event, _ in events_and_contexts:
- if not event.internal_metadata.is_redacted():
- # If we're persisting an unredacted event we go and ensure
- # that we mark any redactions that reference this event as
- # requiring censoring.
- self.db_pool.simple_update_txn(
- txn,
- table="redactions",
- keyvalues={"redacts": event.event_id},
- updatevalues={"have_censored": False},
+ # If we're persisting an unredacted event we go and ensure
+ # that we mark any redactions that reference this event as
+ # requiring censoring.
+ sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?"
+ txn.execute_batch(
+ sql,
+ (
+ (
+ False,
+ event.event_id,
)
+ for event, _ in events_and_contexts
+ if not event.internal_metadata.is_redacted()
+ ),
+ )
state_events_and_contexts = [
ec for ec in events_and_contexts if ec[0].is_state()
]
state_values = []
- for event, context in state_events_and_contexts:
+ for event, _ in state_events_and_contexts:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
@@ -1462,7 +1468,7 @@ class PersistEventsStore:
# nothing to do here
return
- for event, context in events_and_contexts:
+ for event, _ in events_and_contexts:
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the
# redacted event.
@@ -1879,20 +1885,28 @@ class PersistEventsStore:
),
)
- for event, _ in events_and_contexts:
- user_ids = self.db_pool.simple_select_onecol_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={"event_id": event.event_id},
- retcol="user_id",
- )
+ room_to_event_ids = {} # type: Dict[str, List[str]]
+ for e, _ in events_and_contexts:
+ room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
- for uid in user_ids:
- txn.call_after(
- self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid),
+ for room_id, event_ids in room_to_event_ids.items():
+ rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="event_push_actions_staging",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=("user_id",),
)
+ user_ids = {row["user_id"] for row in rows}
+
+ for user_id in user_ids:
+ txn.call_after(
+ self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (room_id, user_id),
+ )
+
# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 78367ea5..cbe4be14 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -838,7 +837,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
# We want to do a `(topological_ordering, stream_ordering) > (?,?)`
# comparison, but that is not supported on older SQLite versions
tuple_clause, tuple_args = make_tuple_comparison_clause(
- self.database_engine,
[
("events.room_id", last_room_id),
("topological_ordering", last_depth),
diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py
index b3703ae1..6d2688d7 100644
--- a/synapse/storage/databases/main/events_forward_extremities.py
+++ b/synapse/storage/databases/main/events_forward_extremities.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 952d4969..2c823e09 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,7 +15,16 @@
import logging
import threading
from collections import namedtuple
-from typing import Dict, Iterable, List, Optional, Tuple, overload
+from typing import (
+ Collection,
+ Container,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Tuple,
+ overload,
+)
from constantly import NamedConstant, Names
from typing_extensions import Literal
@@ -46,7 +54,7 @@ from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
-from synapse.types import Collection, JsonDict, get_domain_from_id
+from synapse.types import JsonDict, get_domain_from_id
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
@@ -544,7 +552,7 @@ class EventsWorkerStore(SQLBaseStore):
async def get_stripped_room_state_from_event_context(
self,
context: EventContext,
- state_types_to_include: List[EventTypes],
+ state_types_to_include: Container[str],
membership_user_id: Optional[str] = None,
) -> List[JsonDict]:
"""
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index d2f5b9a5..bb244a03 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index ac07e019..66ad363b 100644
--- a/synapse/storage/databases/main/group_server.py
+++ b/synapse/storage/databases/main/group_server.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
@@ -1027,8 +1026,8 @@ class GroupServerStore(GroupServerWorkerStore):
user_id: str,
is_admin: bool = False,
is_public: bool = True,
- local_attestation: dict = None,
- remote_attestation: dict = None,
+ local_attestation: Optional[dict] = None,
+ remote_attestation: Optional[dict] = None,
) -> None:
"""Add a user to the group server.
@@ -1171,7 +1170,7 @@ class GroupServerStore(GroupServerWorkerStore):
user_id: str,
membership: str,
is_admin: bool = False,
- content: JsonDict = {},
+ content: Optional[JsonDict] = None,
local_attestation: Optional[dict] = None,
remote_attestation: Optional[dict] = None,
is_publicised: bool = False,
@@ -1192,6 +1191,8 @@ class GroupServerStore(GroupServerWorkerStore):
is_publicised: Whether this should be publicised.
"""
+ content = content or {}
+
def _register_user_group_membership_txn(txn, next_id):
# TODO: Upsert?
self.db_pool.simple_delete_txn(
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index d504323b..0e868078 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd.
#
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 4f3d1925..c5848681 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2020-2021 The Matrix.org Foundation C.I.C.
#
@@ -22,6 +21,9 @@ from synapse.storage.database import DatabasePool
BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD = (
"media_repository_drop_index_wo_method"
)
+BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = (
+ "media_repository_drop_index_wo_method_2"
+)
class MediaSortOrder(Enum):
@@ -85,23 +87,35 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
unique=True,
)
+ # the original impl of _drop_media_index_without_method was broken (see
+ # https://github.com/matrix-org/synapse/issues/8649), so we replace the original
+ # impl with a no-op and run the fixed migration as
+ # media_repository_drop_index_wo_method_2.
+ self.db_pool.updates.register_noop_background_update(
+ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD
+ )
self.db_pool.updates.register_background_update_handler(
- BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD,
+ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2,
self._drop_media_index_without_method,
)
async def _drop_media_index_without_method(self, progress, batch_size):
+ """background update handler which removes the old constraints.
+
+ Note that this is only run on postgres.
+ """
+
def f(txn):
txn.execute(
"ALTER TABLE local_media_repository_thumbnails DROP CONSTRAINT IF EXISTS local_media_repository_thumbn_media_id_thumbnail_width_thum_key"
)
txn.execute(
- "ALTER TABLE remote_media_cache_thumbnails DROP CONSTRAINT IF EXISTS remote_media_repository_thumbn_media_id_thumbnail_width_thum_key"
+ "ALTER TABLE remote_media_cache_thumbnails DROP CONSTRAINT IF EXISTS remote_media_cache_thumbnails_media_origin_media_id_thumbna_key"
)
await self.db_pool.runInteraction("drop_media_indices_without_method", f)
await self.db_pool.updates._end_background_update(
- BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD
+ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2
)
return 1
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 614a418a..c3f551d3 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 757da3d5..fe256382 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2018 New Vector
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 0ff693a3..db22fab2 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,16 +12,69 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Dict, List, Tuple
+from typing import TYPE_CHECKING, Dict, List, Tuple
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import PresenceState, UserPresenceState
+from synapse.replication.tcp.streams import PresenceStream
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage.database import DatabasePool
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.types import Connection
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
class PresenceStore(SQLBaseStore):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: Connection,
+ hs: "HomeServer",
+ ):
+ super().__init__(database, db_conn, hs)
+
+ self._can_persist_presence = (
+ hs.get_instance_name() in hs.config.worker.writers.presence
+ )
+
+ if isinstance(database.engine, PostgresEngine):
+ self._presence_id_gen = MultiWriterIdGenerator(
+ db_conn=db_conn,
+ db=database,
+ stream_name="presence_stream",
+ instance_name=self._instance_name,
+ tables=[("presence_stream", "instance_name", "stream_id")],
+ sequence_name="presence_stream_sequence",
+ writers=hs.config.worker.writers.to_device,
+ )
+ else:
+ self._presence_id_gen = StreamIdGenerator(
+ db_conn, "presence_stream", "stream_id"
+ )
+
+ self._presence_on_startup = self._get_active_presence(db_conn)
+
+ presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
+ db_conn,
+ "presence_stream",
+ entity_column="user_id",
+ stream_column="stream_id",
+ max_value=self._presence_id_gen.get_current_token(),
+ )
+ self.presence_stream_cache = StreamChangeCache(
+ "PresenceStreamChangeCache",
+ min_presence_val,
+ prefilled_cache=presence_cache_prefill,
+ )
+
async def update_presence(self, presence_states):
+ assert self._can_persist_presence
+
stream_ordering_manager = self._presence_id_gen.get_next_mult(
len(presence_states)
)
@@ -58,6 +110,7 @@ class PresenceStore(SQLBaseStore):
"last_user_sync_ts": state.last_user_sync_ts,
"status_msg": state.status_msg,
"currently_active": state.currently_active,
+ "instance_name": self._instance_name,
}
for stream_id, state in zip(stream_orderings, presence_states)
],
@@ -217,3 +270,37 @@ class PresenceStore(SQLBaseStore):
def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()
+
+ def _get_active_presence(self, db_conn: Connection):
+ """Fetch non-offline presence from the database so that we can register
+ the appropriate time outs.
+ """
+
+ sql = (
+ "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
+ " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
+ " WHERE state != ?"
+ )
+
+ txn = db_conn.cursor()
+ txn.execute(sql, (PresenceState.OFFLINE,))
+ rows = self.db_pool.cursor_to_dict(txn)
+ txn.close()
+
+ for row in rows:
+ row["currently_active"] = bool(row["currently_active"])
+
+ return [UserPresenceState(**row) for row in rows]
+
+ def take_presence_startup_info(self):
+ active_on_startup = self._presence_on_startup
+ self._presence_on_startup = None
+ return active_on_startup
+
+ def process_replication_rows(self, stream_name, instance_name, token, rows):
+ if stream_name == PresenceStream.NAME:
+ self._presence_id_gen.advance(instance_name, token)
+ for row in rows:
+ self.presence_stream_cache.entity_has_changed(row.user_id, token)
+ self._get_presence_for_user.invalidate((row.user_id,))
+ return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index ba01d310..9b4e95e1 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 41f4fe7f..8f83748b 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 9e58dc0e..db521763 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index c65558c2..b48fe086 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 43c852c9..3647276a 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 90a8f664..6e5ee557 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017-2018 New Vector Ltd
# Copyright 2019,2020 The Matrix.org Foundation C.I.C.
@@ -92,13 +91,25 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
id_column=None,
)
- self._account_validity = hs.config.account_validity
- if hs.config.run_background_tasks and self._account_validity.enabled:
- self._clock.call_later(
- 0.0,
- self._set_expiration_date_when_missing,
+ self._account_validity_enabled = (
+ hs.config.account_validity.account_validity_enabled
+ )
+ self._account_validity_period = None
+ self._account_validity_startup_job_max_delta = None
+ if self._account_validity_enabled:
+ self._account_validity_period = (
+ hs.config.account_validity.account_validity_period
+ )
+ self._account_validity_startup_job_max_delta = (
+ hs.config.account_validity.account_validity_startup_job_max_delta
)
+ if hs.config.run_background_tasks:
+ self._clock.call_later(
+ 0.0,
+ self._set_expiration_date_when_missing,
+ )
+
# Create a background job for culling expired 3PID validity tokens
if hs.config.run_background_tasks:
self._clock.looping_call(
@@ -195,6 +206,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
expiration_ts: int,
email_sent: bool,
renewal_token: Optional[str] = None,
+ token_used_ts: Optional[int] = None,
) -> None:
"""Updates the account validity properties of the given account, with the
given values.
@@ -208,6 +220,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
period.
renewal_token: Renewal token the user can use to extend the validity
of their account. Defaults to no token.
+ token_used_ts: A timestamp of when the current token was used to renew
+ the account.
"""
def set_account_validity_for_user_txn(txn):
@@ -219,6 +233,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"expiration_ts_ms": expiration_ts,
"email_sent": email_sent,
"renewal_token": renewal_token,
+ "token_used_ts_ms": token_used_ts,
},
)
self._invalidate_cache_and_stream(
@@ -232,7 +247,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
async def set_renewal_token_for_user(
self, user_id: str, renewal_token: str
) -> None:
- """Defines a renewal token for a given user.
+ """Defines a renewal token for a given user, and clears the token_used timestamp.
Args:
user_id: ID of the user to set the renewal token for.
@@ -245,26 +260,40 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
await self.db_pool.simple_update_one(
table="account_validity",
keyvalues={"user_id": user_id},
- updatevalues={"renewal_token": renewal_token},
+ updatevalues={"renewal_token": renewal_token, "token_used_ts_ms": None},
desc="set_renewal_token_for_user",
)
- async def get_user_from_renewal_token(self, renewal_token: str) -> str:
- """Get a user ID from a renewal token.
+ async def get_user_from_renewal_token(
+ self, renewal_token: str
+ ) -> Tuple[str, int, Optional[int]]:
+ """Get a user ID and renewal status from a renewal token.
Args:
renewal_token: The renewal token to perform the lookup with.
Returns:
- The ID of the user to which the token belongs.
+ A tuple of containing the following values:
+ * The ID of a user to which the token belongs.
+ * An int representing the user's expiry timestamp as milliseconds since the
+ epoch, or 0 if the token was invalid.
+ * An optional int representing the timestamp of when the user renewed their
+ account timestamp as milliseconds since the epoch. None if the account
+ has not been renewed using the current token yet.
"""
- return await self.db_pool.simple_select_one_onecol(
+ ret_dict = await self.db_pool.simple_select_one(
table="account_validity",
keyvalues={"renewal_token": renewal_token},
- retcol="user_id",
+ retcols=["user_id", "expiration_ts_ms", "token_used_ts_ms"],
desc="get_user_from_renewal_token",
)
+ return (
+ ret_dict["user_id"],
+ ret_dict["expiration_ts_ms"],
+ ret_dict["token_used_ts_ms"],
+ )
+
async def get_renewal_token_for_user(self, user_id: str) -> str:
"""Get the renewal token associated with a given user ID.
@@ -303,7 +332,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"get_users_expiring_soon",
select_users_txn,
self._clock.time_msec(),
- self.config.account_validity.renew_at,
+ self.config.account_validity_renew_at,
)
async def set_renewal_mail_status(self, user_id: str, email_sent: bool) -> None:
@@ -965,11 +994,11 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
delta equal to 10% of the validity period.
"""
now_ms = self._clock.time_msec()
- expiration_ts = now_ms + self._account_validity.period
+ expiration_ts = now_ms + self._account_validity_period
if use_delta:
expiration_ts = self.rand.randrange(
- expiration_ts - self._account_validity.startup_job_max_delta,
+ expiration_ts - self._account_validity_startup_job_max_delta,
expiration_ts,
)
@@ -1413,7 +1442,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
except self.database_engine.module.IntegrityError:
raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
- if self._account_validity.enabled:
+ if self._account_validity_enabled:
self.set_expiration_date_for_user_txn(txn, user_id)
if create_profile_with_displayname:
diff --git a/synapse/storage/databases/main/rejections.py b/synapse/storage/databases/main/rejections.py
index 1e361aaa..167318b3 100644
--- a/synapse/storage/databases/main/rejections.py
+++ b/synapse/storage/databases/main/rejections.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 5cd61547..2bbf6d6a 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 9cbcd530..5f38634f 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
@@ -521,13 +520,11 @@ class RoomWorkerStore(SQLBaseStore):
)
@cached(max_entries=10000)
- async def get_ratelimit_for_user(self, user_id):
- """Check if there are any overrides for ratelimiting for the given
- user
+ async def get_ratelimit_for_user(self, user_id: str) -> Optional[RatelimitOverride]:
+ """Check if there are any overrides for ratelimiting for the given user
Args:
- user_id (str)
-
+ user_id: user ID of the user
Returns:
RatelimitOverride if there is an override, else None. If the contents
of RatelimitOverride are None or 0 then ratelimitng has been
@@ -549,6 +546,62 @@ class RoomWorkerStore(SQLBaseStore):
else:
return None
+ async def set_ratelimit_for_user(
+ self, user_id: str, messages_per_second: int, burst_count: int
+ ) -> None:
+ """Sets whether a user is set an overridden ratelimit.
+ Args:
+ user_id: user ID of the user
+ messages_per_second: The number of actions that can be performed in a second.
+ burst_count: How many actions that can be performed before being limited.
+ """
+
+ def set_ratelimit_txn(txn):
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="ratelimit_override",
+ keyvalues={"user_id": user_id},
+ values={
+ "messages_per_second": messages_per_second,
+ "burst_count": burst_count,
+ },
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_ratelimit_for_user, (user_id,)
+ )
+
+ await self.db_pool.runInteraction("set_ratelimit", set_ratelimit_txn)
+
+ async def delete_ratelimit_for_user(self, user_id: str) -> None:
+ """Delete an overridden ratelimit for a user.
+ Args:
+ user_id: user ID of the user
+ """
+
+ def delete_ratelimit_txn(txn):
+ row = self.db_pool.simple_select_one_txn(
+ txn,
+ table="ratelimit_override",
+ keyvalues={"user_id": user_id},
+ retcols=["user_id"],
+ allow_none=True,
+ )
+
+ if not row:
+ return
+
+ # They are there, delete them.
+ self.db_pool.simple_delete_one_txn(
+ txn, "ratelimit_override", keyvalues={"user_id": user_id}
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_ratelimit_for_user, (user_id,)
+ )
+
+ await self.db_pool.runInteraction("delete_ratelimit", delete_ratelimit_txn)
+
@cached()
async def get_retention_policy_for_room(self, room_id):
"""Get the retention policy for a given room.
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index a9216ca9..2a8532f8 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
@@ -14,7 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ Collection,
+ Dict,
+ FrozenSet,
+ Iterable,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+)
+
+import attr
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
@@ -34,7 +46,7 @@ from synapse.storage.roommember import (
ProfileInfo,
RoomsForUser,
)
-from synapse.types import Collection, PersistedEventPosition, get_domain_from_id
+from synapse.types import PersistedEventPosition, StateMap, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -54,6 +66,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
+ # Used by `_get_joined_hosts` to ensure only one thing mutates the cache
+ # at a time. Keyed by room_id.
+ self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
+
# Is the current_state_events.membership up to date? Or is the
# background update still running?
self._current_state_events_membership_up_to_date = False
@@ -174,6 +190,33 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id, Membership.JOIN))
return [r[0] for r in txn]
+ @cached(max_entries=100000, iterable=True)
+ async def get_users_in_room_with_profiles(
+ self, room_id: str
+ ) -> Dict[str, ProfileInfo]:
+ """Get a mapping from user ID to profile information for all users in a given room.
+
+ Args:
+ room_id: The ID of the room to retrieve the users of.
+
+ Returns:
+ A mapping from user ID to ProfileInfo.
+ """
+
+ def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]:
+ sql = """
+ SELECT user_id, display_name, avatar_url FROM room_memberships
+ WHERE room_id = ? AND membership = ?
+ """
+ txn.execute(sql, (room_id, Membership.JOIN))
+
+ return {r[0]: ProfileInfo(display_name=r[1], avatar_url=r[2]) for r in txn}
+
+ return await self.db_pool.runInteraction(
+ "get_users_in_room_with_profiles",
+ _get_users_in_room_with_profiles,
+ )
+
@cached(max_entries=100000)
async def get_room_summary(self, room_id: str) -> Dict[str, MemberSummary]:
"""Get the details of a room roughly suitable for use by the room
@@ -704,19 +747,82 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(num_args=2, max_entries=10000, iterable=True)
async def _get_joined_hosts(
- self, room_id, state_group, current_state_ids, state_entry
- ):
- # We don't use `state_group`, its there so that we can cache based
- # on it. However, its important that its never None, since two current_state's
- # with a state_group of None are likely to be different.
+ self,
+ room_id: str,
+ state_group: int,
+ current_state_ids: StateMap[str],
+ state_entry: "_StateCacheEntry",
+ ) -> FrozenSet[str]:
+ # We don't use `state_group`, its there so that we can cache based on
+ # it. However, its important that its never None, since two
+ # current_state's with a state_group of None are likely to be different.
+ #
+ # The `state_group` must match the `state_entry.state_group` (if not None).
assert state_group is not None
-
+ assert state_entry.state_group is None or state_entry.state_group == state_group
+
+ # We use a secondary cache of previous work to allow us to build up the
+ # joined hosts for the given state group based on previous state groups.
+ #
+ # We cache one object per room containing the results of the last state
+ # group we got joined hosts for. The idea is that generally
+ # `get_joined_hosts` is called with the "current" state group for the
+ # room, and so consecutive calls will be for consecutive state groups
+ # which point to the previous state group.
cache = await self._get_joined_hosts_cache(room_id)
- return await cache.get_destinations(state_entry)
+
+ # If the state group in the cache matches, we already have the data we need.
+ if state_entry.state_group == cache.state_group:
+ return frozenset(cache.hosts_to_joined_users)
+
+ # Since we'll mutate the cache we need to lock.
+ with (await self._joined_host_linearizer.queue(room_id)):
+ if state_entry.state_group == cache.state_group:
+ # Same state group, so nothing to do. We've already checked for
+ # this above, but the cache may have changed while waiting on
+ # the lock.
+ pass
+ elif state_entry.prev_group == cache.state_group:
+ # The cached work is for the previous state group, so we work out
+ # the delta.
+ for (typ, state_key), event_id in state_entry.delta_ids.items():
+ if typ != EventTypes.Member:
+ continue
+
+ host = intern_string(get_domain_from_id(state_key))
+ user_id = state_key
+ known_joins = cache.hosts_to_joined_users.setdefault(host, set())
+
+ event = await self.get_event(event_id)
+ if event.membership == Membership.JOIN:
+ known_joins.add(user_id)
+ else:
+ known_joins.discard(user_id)
+
+ if not known_joins:
+ cache.hosts_to_joined_users.pop(host, None)
+ else:
+ # The cache doesn't match the state group or prev state group,
+ # so we calculate the result from first principles.
+ joined_users = await self.get_joined_users_from_state(
+ room_id, state_entry
+ )
+
+ cache.hosts_to_joined_users = {}
+ for user_id in joined_users:
+ host = intern_string(get_domain_from_id(user_id))
+ cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
+
+ if state_entry.state_group:
+ cache.state_group = state_entry.state_group
+ else:
+ cache.state_group = object()
+
+ return frozenset(cache.hosts_to_joined_users)
@cached(max_entries=10000)
def _get_joined_hosts_cache(self, room_id: str) -> "_JoinedHostsCache":
- return _JoinedHostsCache(self, room_id)
+ return _JoinedHostsCache()
@cached(num_args=2)
async def did_forget(self, user_id: str, room_id: str) -> bool:
@@ -1026,71 +1132,18 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
await self.db_pool.runInteraction("forget_membership", f)
+@attr.s(slots=True)
class _JoinedHostsCache:
- """Cache for joined hosts in a room that is optimised to handle updates
- via state deltas.
- """
-
- def __init__(self, store, room_id):
- self.store = store
- self.room_id = room_id
-
- self.hosts_to_joined_users = {}
-
- self.state_group = object()
+ """The cached data used by the `_get_joined_hosts_cache`."""
- self.linearizer = Linearizer("_JoinedHostsCache")
+ # Dict of host to the set of their users in the room at the state group.
+ hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict)
- self._len = 0
-
- async def get_destinations(self, state_entry: "_StateCacheEntry") -> Set[str]:
- """Get set of destinations for a state entry
-
- Args:
- state_entry
-
- Returns:
- The destinations as a set.
- """
- if state_entry.state_group == self.state_group:
- return frozenset(self.hosts_to_joined_users)
-
- with (await self.linearizer.queue(())):
- if state_entry.state_group == self.state_group:
- pass
- elif state_entry.prev_group == self.state_group:
- for (typ, state_key), event_id in state_entry.delta_ids.items():
- if typ != EventTypes.Member:
- continue
-
- host = intern_string(get_domain_from_id(state_key))
- user_id = state_key
- known_joins = self.hosts_to_joined_users.setdefault(host, set())
-
- event = await self.store.get_event(event_id)
- if event.membership == Membership.JOIN:
- known_joins.add(user_id)
- else:
- known_joins.discard(user_id)
-
- if not known_joins:
- self.hosts_to_joined_users.pop(host, None)
- else:
- joined_users = await self.store.get_joined_users_from_state(
- self.room_id, state_entry
- )
-
- self.hosts_to_joined_users = {}
- for user_id in joined_users:
- host = intern_string(get_domain_from_id(user_id))
- self.hosts_to_joined_users.setdefault(host, set()).add(user_id)
-
- if state_entry.state_group:
- self.state_group = state_entry.state_group
- else:
- self.state_group = object()
- self._len = sum(len(v) for v in self.hosts_to_joined_users.values())
- return frozenset(self.hosts_to_joined_users)
+ # The state group `hosts_to_joined_users` is derived from. Will be an object
+ # if the instance is newly created or if the state is not based on a state
+ # group. (An object is used as a sentinel value to ensure that it never is
+ # equal to anything else).
+ state_group = attr.ib(type=Union[object, int], factory=object)
def __len__(self):
- return self._len
+ return sum(len(v) for v in self.hosts_to_joined_users.values())
diff --git a/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py b/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py
index b1684a84..acd6ad1e 100644
--- a/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py
+++ b/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py
index 44917f0a..66989222 100644
--- a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py
+++ b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2020 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/schema/delta/59/11drop_thumbnail_constraint.sql.postgres b/synapse/storage/databases/main/schema/delta/59/11drop_thumbnail_constraint.sql.postgres
new file mode 100644
index 00000000..54c1bca3
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/11drop_thumbnail_constraint.sql.postgres
@@ -0,0 +1,22 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- drop old constraints on remote_media_cache_thumbnails
+--
+-- This was originally part of 57.07, but it was done wrong, per
+-- https://github.com/matrix-org/synapse/issues/8649, so we do it again.
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+ (5911, 'media_repository_drop_index_wo_method_2', '{}', 'remote_media_repository_thumbnails_method_idx');
+
diff --git a/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql
new file mode 100644
index 00000000..4836dac1
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Track when users renew their account using the value of the 'renewal_token' column.
+-- This field should be set to NULL after a fresh token is generated.
+ALTER TABLE account_validity ADD token_used_ts_ms BIGINT;
diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql
new file mode 100644
index 00000000..b6ba0bda
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a column to specify which instance wrote the row. Historic rows have
+-- `NULL`, which indicates that the master instance wrote them.
+ALTER TABLE presence_stream ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres
new file mode 100644
index 00000000..02b182ad
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS presence_stream_sequence;
+
+SELECT setval('presence_stream_sequence', (
+ SELECT COALESCE(MAX(stream_id), 1) FROM presence_stream
+));
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f5e7d9ef..6480d5a9 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,7 +15,7 @@
import logging
import re
from collections import namedtuple
-from typing import List, Optional, Set
+from typing import Collection, List, Optional, Set
from synapse.api.errors import SynapseError
from synapse.events import EventBase
@@ -24,7 +23,6 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-from synapse.types import Collection
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py
index c8c67953..ab2159c2 100644
--- a/synapse/storage/databases/main/signatures.py
+++ b/synapse/storage/databases/main/signatures.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index a7f37173..1757064a 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
@@ -190,7 +189,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# FIXME: how should this be cached?
async def get_filtered_current_state_ids(
- self, room_id: str, state_filter: StateFilter = StateFilter.all()
+ self, room_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[str]:
"""Get the current state event of a given type for a room based on the
current_state_events table. This may not be as up-to-date as the result
@@ -205,7 +204,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
Map from type/state_key to event ID.
"""
- where_clause, where_args = state_filter.make_sql_filter_clause()
+ where_clause, where_args = (
+ state_filter or StateFilter.all()
+ ).make_sql_filter_clause()
if not where_clause:
# We delegate to the cached version
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 0dbb501f..bff7d040 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2018 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 1c99393c..ae9f8809 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
@@ -66,18 +65,37 @@ TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
class UserSortOrder(Enum):
"""
Enum to define the sorting method used when returning users
- with get_users_media_usage_paginate
+ with get_users_paginate in __init__.py
+ and get_users_media_usage_paginate in stats.py
- MEDIA_LENGTH = ordered by size of uploaded media. Smallest to largest.
- MEDIA_COUNT = ordered by number of uploaded media. Smallest to largest.
+ When moves this to __init__.py gets `builtins.ImportError` with
+ `most likely due to a circular import`
+
+ MEDIA_LENGTH = ordered by size of uploaded media.
+ MEDIA_COUNT = ordered by number of uploaded media.
USER_ID = ordered alphabetically by `user_id`.
+ NAME = ordered alphabetically by `user_id`. This is for compatibility reasons,
+ as the user_id is returned in the name field in the response in list users admin API.
DISPLAYNAME = ordered alphabetically by `displayname`
+ GUEST = ordered by `is_guest`
+ ADMIN = ordered by `admin`
+ DEACTIVATED = ordered by `deactivated`
+ USER_TYPE = ordered alphabetically by `user_type`
+ AVATAR_URL = ordered alphabetically by `avatar_url`
+ SHADOW_BANNED = ordered by `shadow_banned`
"""
MEDIA_LENGTH = "media_length"
MEDIA_COUNT = "media_count"
USER_ID = "user_id"
+ NAME = "name"
DISPLAYNAME = "displayname"
+ GUEST = "is_guest"
+ ADMIN = "admin"
+ DEACTIVATED = "deactivated"
+ USER_TYPE = "user_type"
+ AVATAR_URL = "avatar_url"
+ SHADOW_BANNED = "shadow_banned"
class StatsStore(StateDeltasStore):
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 91f8abb6..7581c7d3 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018-2019 New Vector Ltd
@@ -38,7 +37,7 @@ what sort order was used:
import abc
import logging
from collections import namedtuple
-from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
from twisted.internet import defer
@@ -54,7 +53,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
+from synapse.types import PersistedEventPosition, RoomStreamToken
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index 50067eab..1d62c614 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index b7072f1f..82335e7a 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py
index 5473ec14..22c05cdd 100644
--- a/synapse/storage/databases/main/ui_auth.py
+++ b/synapse/storage/databases/main/ui_auth.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2020 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 1026f321..7a082fdd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py
index f9575b1f..acf6b2fb 100644
--- a/synapse/storage/databases/main/user_erasure_store.py
+++ b/synapse/storage/databases/main/user_erasure_store.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/state/__init__.py b/synapse/storage/databases/state/__init__.py
index c90d0228..e5100d61 100644
--- a/synapse/storage/databases/state/__init__.py
+++ b/synapse/storage/databases/state/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 1fd333b7..c2891cb0 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,6 +13,7 @@
# limitations under the License.
import logging
+from typing import Optional
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
@@ -73,8 +73,10 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
return count
def _get_state_groups_from_groups_txn(
- self, txn, groups, state_filter=StateFilter.all()
+ self, txn, groups, state_filter: Optional[StateFilter] = None
):
+ state_filter = state_filter or StateFilter.all()
+
results = {group: {} for group in groups}
where_clause, where_args = state_filter.make_sql_filter_clause()
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 97ec65f7..e38461ad 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,7 +14,7 @@
import logging
from collections import namedtuple
-from typing import Dict, Iterable, List, Set, Tuple
+from typing import Dict, Iterable, List, Optional, Set, Tuple
from synapse.api.constants import EventTypes
from synapse.storage._base import SQLBaseStore
@@ -210,7 +209,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return state_filter.filter_state(state_dict_ids), not missing_types
async def _get_state_for_groups(
- self, groups: Iterable[int], state_filter: StateFilter = StateFilter.all()
+ self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Dict[int, MutableStateMap[str]]:
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
@@ -223,6 +222,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
Returns:
Dict of state group to state map.
"""
+ state_filter = state_filter or StateFilter.all()
member_filter, non_member_filter = state_filter.get_member_split()
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index d15ccfac..9abc0204 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index cca839c7..1882bfd9 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -44,14 +43,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
- def supports_tuple_comparison(self) -> bool:
- """
- Do we support comparing tuples, i.e. `(a, b) > (c, d)`?
- """
- ...
-
- @property
- @abc.abstractmethod
def supports_using_any_list(self) -> bool:
"""
Do we support using `a = ANY(?)` and passing a list
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 80a3558a..21411c5f 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -47,8 +46,8 @@ class PostgresEngine(BaseDatabaseEngine):
self._version = db_conn.server_version
# Are we on a supported PostgreSQL version?
- if not allow_outdated_version and self._version < 90500:
- raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.")
+ if not allow_outdated_version and self._version < 90600:
+ raise RuntimeError("Synapse requires PostgreSQL 9.6 or above.")
with db_conn.cursor() as txn:
txn.execute("SHOW SERVER_ENCODING")
@@ -130,13 +129,6 @@ class PostgresEngine(BaseDatabaseEngine):
return True
@property
- def supports_tuple_comparison(self):
- """
- Do we support comparing tuples, i.e. `(a, b) > (c, d)`?
- """
- return True
-
- @property
def supports_using_any_list(self):
"""Do we support using `a = ANY(?)` and passing a list"""
return True
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index b87e7798..5fe1b205 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -57,14 +56,6 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
return self.module.sqlite_version_info >= (3, 24, 0)
@property
- def supports_tuple_comparison(self):
- """
- Do we support comparing tuples, i.e. `(a, b) > (c, d)`? This requires
- SQLite 3.15+.
- """
- return self.module.sqlite_version_info >= (3, 15, 0)
-
- @property
def supports_using_any_list(self):
"""Do we support using `a = ANY(?)` and passing a list"""
return False
@@ -72,8 +63,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
def check_database(self, db_conn, allow_outdated_version: bool = False):
if not allow_outdated_version:
version = self.module.sqlite_version_info
- if version < (3, 11, 0):
- raise RuntimeError("Synapse requires sqlite 3.11 or above.")
+ # Synapse is untested against older SQLite versions, and we don't want
+ # to let users upgrade to a version of Synapse with broken support for their
+ # sqlite version, because it risks leaving them with a half-upgraded db.
+ if version < (3, 22, 0):
+ raise RuntimeError("Synapse requires sqlite 3.22 or above.")
def check_new_database(self, txn):
"""Gets called when setting up a brand new database. This allows us to
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index c03871f3..540adb87 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd.
#
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 3a0d6fb3..33dc752d 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
@@ -18,7 +17,7 @@
import itertools
import logging
from collections import deque, namedtuple
-from typing import Dict, Iterable, List, Optional, Set, Tuple
+from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from prometheus_client import Counter, Histogram
@@ -33,7 +32,6 @@ from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
- Collection,
PersistedEventPosition,
RoomStreamToken,
StateMap,
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6c3c2da5..7a2cbee4 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
@@ -13,12 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import imp
+import importlib.util
import logging
import os
import re
from collections import Counter
-from typing import Generator, Iterable, List, Optional, TextIO, Tuple
+from typing import Collection, Generator, Iterable, List, Optional, TextIO, Tuple
import attr
from typing_extensions import Counter as CounterType
@@ -28,7 +27,6 @@ from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.engines.postgres import PostgresEngine
from synapse.storage.types import Cursor
-from synapse.types import Collection
logger = logging.getLogger(__name__)
@@ -454,8 +452,13 @@ def _upgrade_existing_database(
)
module_name = "synapse.storage.v%d_%s" % (v, root_name)
- with open(absolute_path) as python_file:
- module = imp.load_source(module_name, absolute_path, python_file) # type: ignore
+
+ spec = importlib.util.spec_from_file_location(
+ module_name, absolute_path
+ )
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module) # type: ignore
+
logger.info("Running script %s", relative_path)
module.run_create(cur, database_engine) # type: ignore
if not is_empty:
diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py
index ad954990..30669beb 100644
--- a/synapse/storage/purge_events.py
+++ b/synapse/storage/purge_events.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index f47cec0d..2d5c21ef 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 2564f34b..c552dbf0 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d2ff4da6..c34fbf21 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 2e277a21..cfafba22 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -449,7 +448,7 @@ class StateGroupStorage:
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
async def get_state_for_events(
- self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all()
+ self, event_ids: Iterable[str], state_filter: Optional[StateFilter] = None
) -> Dict[str, StateMap[EventBase]]:
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event.
@@ -465,7 +464,7 @@ class StateGroupStorage:
groups = set(event_to_groups.values())
group_to_state = await self.stores.state._get_state_for_groups(
- groups, state_filter
+ groups, state_filter or StateFilter.all()
)
state_event_map = await self.stores.main.get_events(
@@ -485,7 +484,7 @@ class StateGroupStorage:
return {event: event_to_state[event] for event in event_ids}
async def get_state_ids_for_events(
- self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all()
+ self, event_ids: Iterable[str], state_filter: Optional[StateFilter] = None
) -> Dict[str, StateMap[str]]:
"""
Get the state dicts corresponding to a list of events, containing the event_ids
@@ -502,7 +501,7 @@ class StateGroupStorage:
groups = set(event_to_groups.values())
group_to_state = await self.stores.state._get_state_for_groups(
- groups, state_filter
+ groups, state_filter or StateFilter.all()
)
event_to_state = {
@@ -513,7 +512,7 @@ class StateGroupStorage:
return {event: event_to_state[event] for event in event_ids}
async def get_state_for_event(
- self, event_id: str, state_filter: StateFilter = StateFilter.all()
+ self, event_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[EventBase]:
"""
Get the state dict corresponding to a particular event
@@ -525,11 +524,13 @@ class StateGroupStorage:
Returns:
A dict from (type, state_key) -> state_event
"""
- state_map = await self.get_state_for_events([event_id], state_filter)
+ state_map = await self.get_state_for_events(
+ [event_id], state_filter or StateFilter.all()
+ )
return state_map[event_id]
async def get_state_ids_for_event(
- self, event_id: str, state_filter: StateFilter = StateFilter.all()
+ self, event_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[str]:
"""
Get the state dict corresponding to a particular event
@@ -541,11 +542,13 @@ class StateGroupStorage:
Returns:
A dict from (type, state_key) -> state_event
"""
- state_map = await self.get_state_ids_for_events([event_id], state_filter)
+ state_map = await self.get_state_ids_for_events(
+ [event_id], state_filter or StateFilter.all()
+ )
return state_map[event_id]
def _get_state_for_groups(
- self, groups: Iterable[int], state_filter: StateFilter = StateFilter.all()
+ self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Awaitable[Dict[int, MutableStateMap[str]]]:
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
@@ -558,7 +561,9 @@ class StateGroupStorage:
Returns:
Dict of state group to state map.
"""
- return self.stores.state._get_state_for_groups(groups, state_filter)
+ return self.stores.state._get_state_for_groups(
+ groups, state_filter or StateFilter.all()
+ )
async def store_state_group(
self,
diff --git a/synapse/storage/types.py b/synapse/storage/types.py
index 17291c9d..57f4883b 100644
--- a/synapse/storage/types.py
+++ b/synapse/storage/types.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py
index bfebb0f6..5e83dba2 100644
--- a/synapse/storage/util/__init__.py
+++ b/synapse/storage/util/__init__.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index d4643c4f..b1bd3a52 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,7 +16,7 @@ import logging
import threading
from collections import OrderedDict
from contextlib import contextmanager
-from typing import Dict, List, Optional, Set, Tuple, Union
+from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
import attr
@@ -91,7 +90,14 @@ class StreamIdGenerator:
# ... persist event ...
"""
- def __init__(self, db_conn, table, column, extra_tables=[], step=1):
+ def __init__(
+ self,
+ db_conn,
+ table,
+ column,
+ extra_tables: Iterable[Tuple[str, str]] = (),
+ step=1,
+ ):
assert step != 0
self._lock = threading.Lock()
self._step = step
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 36a67e70..30b6b8e0 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");