diff options
author | Andrej Shadura <andrewsh@debian.org> | 2021-05-11 16:41:38 +0200 |
---|---|---|
committer | Andrej Shadura <andrewsh@debian.org> | 2021-05-11 16:41:38 +0200 |
commit | 8b13141dd36d5784fe820afa06aa21a0c5735155 (patch) | |
tree | 12ac731328b325992ab80b1c8cecf4ed79f7935b /synapse/storage | |
parent | 55e3da6770137d9858028903525a99764b39e8b3 (diff) |
New upstream version 1.33.2
Diffstat (limited to 'synapse/storage')
75 files changed, 626 insertions, 409 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 0b9007e5..105e4e1f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018,2019 New Vector Ltd # diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 24090532..6b68d872 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017-2018 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. @@ -17,13 +16,13 @@ import logging import random from abc import ABCMeta -from typing import TYPE_CHECKING, Any, Iterable, Optional, Union +from typing import TYPE_CHECKING, Any, Collection, Iterable, Optional, Union from synapse.storage.database import LoggingTransaction # noqa: F401 from synapse.storage.database import make_in_list_sql_clause # noqa: F401 from synapse.storage.database import DatabasePool from synapse.storage.types import Connection -from synapse.types import Collection, StreamToken, get_domain_from_id +from synapse.types import StreamToken, get_domain_from_id from synapse.util import json_decoder if TYPE_CHECKING: @@ -115,7 +114,7 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any: db_content = db_content.tobytes() # Decode it to a Unicode string before feeding it to the JSON decoder, since - # Python 3.5 does not support deserializing bytes. + # it only supports handling strings if isinstance(db_content, (bytes, bytearray)): db_content = db_content.decode("utf8") diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index ccb06aab..142787fd 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 94590e7b..bd39c095 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017-2018 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. @@ -21,6 +20,7 @@ from time import monotonic as monotonic_time from typing import ( Any, Callable, + Collection, Dict, Iterable, Iterator, @@ -49,7 +49,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.types import Connection, Cursor -from synapse.types import Collection # python 3 does not have a maximum int value MAX_TXN_ID = 2 ** 63 - 1 @@ -172,10 +171,7 @@ class LoggingDatabaseConnection: # The type of entry which goes on our after_callbacks and exception_callbacks lists. -# -# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so -# that mypy sees the type but the runtime python doesn't. -_CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]] +_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]] R = TypeVar("R") @@ -222,7 +218,7 @@ class LoggingTransaction: self.after_callbacks = after_callbacks self.exception_callbacks = exception_callbacks - def call_after(self, callback: "Callable[..., None]", *args: Any, **kwargs: Any): + def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. @@ -234,7 +230,7 @@ class LoggingTransaction: self.after_callbacks.append((callback, args, kwargs)) def call_on_exception( - self, callback: "Callable[..., None]", *args: Any, **kwargs: Any + self, callback: Callable[..., None], *args: Any, **kwargs: Any ): # if self.exception_callbacks is None, that means that whatever constructed the # LoggingTransaction isn't expecting there to be any callbacks; assert that @@ -486,9 +482,9 @@ class DatabasePool: desc: str, after_callbacks: List[_CallbackListEntry], exception_callbacks: List[_CallbackListEntry], - func: "Callable[..., R]", + func: Callable[..., R], *args: Any, - **kwargs: Any + **kwargs: Any, ) -> R: """Start a new database transaction with the given connection. @@ -619,10 +615,10 @@ class DatabasePool: async def runInteraction( self, desc: str, - func: "Callable[..., R]", + func: Callable[..., R], *args: Any, db_autocommit: bool = False, - **kwargs: Any + **kwargs: Any, ) -> R: """Starts a transaction on the database and runs a given function @@ -679,10 +675,10 @@ class DatabasePool: async def runWithConnection( self, - func: "Callable[..., R]", + func: Callable[..., R], *args: Any, db_autocommit: bool = False, - **kwargs: Any + **kwargs: Any, ) -> R: """Wraps the .runWithConnection() method on the underlying db_pool. @@ -775,7 +771,7 @@ class DatabasePool: desc: str, decoder: Optional[Callable[[Cursor], R]], query: str, - *args: Any + *args: Any, ) -> R: """Runs a single query for a result set. @@ -900,7 +896,7 @@ class DatabasePool: table: str, keyvalues: Dict[str, Any], values: Dict[str, Any], - insertion_values: Dict[str, Any] = {}, + insertion_values: Optional[Dict[str, Any]] = None, desc: str = "simple_upsert", lock: bool = True, ) -> Optional[bool]: @@ -927,6 +923,8 @@ class DatabasePool: Native upserts always return None. Emulated upserts return True if a new entry was created, False if an existing one was updated. """ + insertion_values = insertion_values or {} + attempts = 0 while True: try: @@ -964,7 +962,7 @@ class DatabasePool: table: str, keyvalues: Dict[str, Any], values: Dict[str, Any], - insertion_values: Dict[str, Any] = {}, + insertion_values: Optional[Dict[str, Any]] = None, lock: bool = True, ) -> Optional[bool]: """ @@ -982,6 +980,8 @@ class DatabasePool: Native upserts always return None. Emulated upserts return True if a new entry was created, False if an existing one was updated. """ + insertion_values = insertion_values or {} + if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: self.simple_upsert_txn_native_upsert( txn, table, keyvalues, values, insertion_values=insertion_values @@ -1003,7 +1003,7 @@ class DatabasePool: table: str, keyvalues: Dict[str, Any], values: Dict[str, Any], - insertion_values: Dict[str, Any] = {}, + insertion_values: Optional[Dict[str, Any]] = None, lock: bool = True, ) -> bool: """ @@ -1017,6 +1017,8 @@ class DatabasePool: Returns True if a new entry was created, False if an existing one was updated. """ + insertion_values = insertion_values or {} + # We need to lock the table :(, unless we're *really* careful if lock: self.engine.lock_table(txn, table) @@ -1077,7 +1079,7 @@ class DatabasePool: table: str, keyvalues: Dict[str, Any], values: Dict[str, Any], - insertion_values: Dict[str, Any] = {}, + insertion_values: Optional[Dict[str, Any]] = None, ) -> None: """ Use the native UPSERT functionality in recent PostgreSQL versions. @@ -1090,7 +1092,7 @@ class DatabasePool: """ allvalues = {} # type: Dict[str, Any] allvalues.update(keyvalues) - allvalues.update(insertion_values) + allvalues.update(insertion_values or {}) if not values: latter = "NOTHING" @@ -1513,7 +1515,7 @@ class DatabasePool: column: str, iterable: Iterable[Any], retcols: Iterable[str], - keyvalues: Dict[str, Any] = {}, + keyvalues: Optional[Dict[str, Any]] = None, desc: str = "simple_select_many_batch", batch_size: int = 100, ) -> List[Any]: @@ -1531,6 +1533,8 @@ class DatabasePool: desc: description of the transaction, for logging and metrics batch_size: the number of rows for each select query """ + keyvalues = keyvalues or {} + results = [] # type: List[Dict[str, Any]] if not iterable: @@ -2059,69 +2063,18 @@ def make_in_list_sql_clause( KV = TypeVar("KV") -def make_tuple_comparison_clause( - database_engine: BaseDatabaseEngine, keys: List[Tuple[str, KV]] -) -> Tuple[str, List[KV]]: +def make_tuple_comparison_clause(keys: List[Tuple[str, KV]]) -> Tuple[str, List[KV]]: """Returns a tuple comparison SQL clause - Depending what the SQL engine supports, builds a SQL clause that looks like either - "(a, b) > (?, ?)", or "(a > ?) OR (a == ? AND b > ?)". + Builds a SQL clause that looks like "(a, b) > (?, ?)" Args: - database_engine keys: A set of (column, value) pairs to be compared. Returns: A tuple of SQL query and the args """ - if database_engine.supports_tuple_comparison: - return ( - "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)), - [k[1] for k in keys], - ) - - # we want to build a clause - # (a > ?) OR - # (a == ? AND b > ?) OR - # (a == ? AND b == ? AND c > ?) - # ... - # (a == ? AND b == ? AND ... AND z > ?) - # - # or, equivalently: - # - # (a > ? OR (a == ? AND - # (b > ? OR (b == ? AND - # ... - # (y > ? OR (y == ? AND - # z > ? - # )) - # ... - # )) - # )) - # - # which itself is equivalent to (and apparently easier for the query optimiser): - # - # (a >= ? AND (a > ? OR - # (b >= ? AND (b > ? OR - # ... - # (y >= ? AND (y > ? OR - # z > ? - # )) - # ... - # )) - # )) - # - # - - clause = "" - args = [] # type: List[KV] - for k, v in keys[:-1]: - clause = clause + "(%s >= ? AND (%s > ? OR " % (k, k) - args.extend([v, v]) - - (k, v) = keys[-1] - clause += "%s > ?" % (k,) - args.append(v) - - clause += "))" * (len(keys) - 1) - return clause, args + return ( + "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)), + [k[1] for k in keys], + ) diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 379c78bb..20b75505 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 1d44c3aa..49c7606d 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # Copyright 2019-2021 The Matrix.org Foundation C.I.C. @@ -18,9 +17,9 @@ import logging from typing import List, Optional, Tuple -from synapse.api.constants import PresenceState from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import DatabasePool +from synapse.storage.databases.main.stats import UserSortOrder from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import ( IdGenerator, @@ -51,7 +50,7 @@ from .media_repository import MediaRepositoryStore from .metrics import ServerMetricsStore from .monthly_active_users import MonthlyActiveUsersStore from .openid import OpenIdStore -from .presence import PresenceStore, UserPresenceState +from .presence import PresenceStore from .profile import ProfileStore from .purge_events import PurgeEventsStore from .push_rule import PushRuleStore @@ -126,9 +125,6 @@ class DataStore( self._clock = hs.get_clock() self.database_engine = database.engine - self._presence_id_gen = StreamIdGenerator( - db_conn, "presence_stream", "stream_id" - ) self._public_room_id_gen = StreamIdGenerator( db_conn, "public_room_list_stream", "stream_id" ) @@ -177,21 +173,6 @@ class DataStore( super().__init__(database, db_conn, hs) - self._presence_on_startup = self._get_active_presence(db_conn) - - presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( - db_conn, - "presence_stream", - entity_column="user_id", - stream_column="stream_id", - max_value=self._presence_id_gen.get_current_token(), - ) - self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", - min_presence_val, - prefilled_cache=presence_cache_prefill, - ) - device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", device_list_max @@ -238,32 +219,6 @@ class DataStore( def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() - def take_presence_startup_info(self): - active_on_startup = self._presence_on_startup - self._presence_on_startup = None - return active_on_startup - - def _get_active_presence(self, db_conn): - """Fetch non-offline presence from the database so that we can register - the appropriate time outs. - """ - - sql = ( - "SELECT user_id, state, last_active_ts, last_federation_update_ts," - " last_user_sync_ts, status_msg, currently_active FROM presence_stream" - " WHERE state != ?" - ) - - txn = db_conn.cursor() - txn.execute(sql, (PresenceState.OFFLINE,)) - rows = self.db_pool.cursor_to_dict(txn) - txn.close() - - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return [UserPresenceState(**row) for row in rows] - async def get_users(self) -> List[JsonDict]: """Function to retrieve a list of users in users table. @@ -292,6 +247,8 @@ class DataStore( name: Optional[str] = None, guests: bool = True, deactivated: bool = False, + order_by: UserSortOrder = UserSortOrder.USER_ID.value, + direction: str = "f", ) -> Tuple[List[JsonDict], int]: """Function to retrieve a paginated list of users from users list. This will return a json list of users and the @@ -304,6 +261,8 @@ class DataStore( name: search for local part of user_id or display name guests: whether to in include guest users deactivated: whether to include deactivated users + order_by: the sort order of the returned list + direction: sort ascending or descending Returns: A tuple of a list of mappings from user to information and a count of total users. """ @@ -312,6 +271,14 @@ class DataStore( filters = [] args = [self.hs.config.server_name] + # Set ordering + order_by_column = UserSortOrder(order_by).value + + if direction == "b": + order = "DESC" + else: + order = "ASC" + # `name` is in database already in lower case if name: filters.append("(name LIKE ? OR LOWER(displayname) LIKE ?)") @@ -339,10 +306,15 @@ class DataStore( txn.execute(sql, args) count = txn.fetchone()[0] - sql = ( - "SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url " - + sql_base - + " ORDER BY u.name LIMIT ? OFFSET ?" + sql = """ + SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url + {sql_base} + ORDER BY {order_by_column} {order}, u.name ASC + LIMIT ? OFFSET ? + """.format( + sql_base=sql_base, + order_by_column=order_by_column, + order=order, ) args += [limit, start] txn.execute(sql, args) diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index a277a1ef..1d02795f 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 85bb853d..9f182c2a 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 1e7637a6..ecc1f935 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 3e26d5ba..f22c1f24 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 6d18e692..d60010e9 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -298,7 +297,6 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): # times, which is fine. where_clause, where_args = make_tuple_comparison_clause( - self.database_engine, [("user_id", last_user_id), ("device_id", last_device_id)], ) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 691080ce..7c9d1f74 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index d327e9aa..c9346de3 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. @@ -16,7 +15,7 @@ # limitations under the License. import abc import logging -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import Any, Collection, Dict, Iterable, List, Optional, Set, Tuple from synapse.api.errors import Codes, StoreError from synapse.logging.opentracing import ( @@ -32,7 +31,7 @@ from synapse.storage.database import ( LoggingTransaction, make_tuple_comparison_clause, ) -from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key +from synapse.types import JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache @@ -718,7 +717,15 @@ class DeviceWorkerStore(SQLBaseStore): keyvalues={"user_id": user_id}, values={}, insertion_values={"added_ts": self._clock.time_msec()}, - desc="make_remote_user_device_cache_as_stale", + desc="mark_remote_user_device_cache_as_stale", + ) + + async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None: + # Remove the database entry that says we need to resync devices, after a resync + await self.db_pool.simple_delete( + table="device_lists_remote_resync", + keyvalues={"user_id": user_id}, + desc="mark_remote_user_device_cache_as_valid", ) async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None: @@ -985,7 +992,7 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): def _txn(txn): clause, args = make_tuple_comparison_clause( - self.db_pool.engine, [(x, last_row[x]) for x in KEY_COLS] + [(x, last_row[x]) for x in KEY_COLS] ) sql = """ SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts @@ -1290,15 +1297,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): lock=False, ) - # If we're replacing the remote user's device list cache presumably - # we've done a full resync, so we remove the entry that says we need - # to resync - self.db_pool.simple_delete_txn( - txn, - table="device_lists_remote_resync", - keyvalues={"user_id": user_id}, - ) - async def add_device_change_to_streams( self, user_id: str, device_ids: Collection[str], hosts: List[str] ): diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py index 267b9483..86075bc5 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index 12ceccee..b15fb71e 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 New Vector Ltd # Copyright 2019 Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index f1e7859d..88afe97c 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index a956be49..ff81d5cd 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,7 +14,7 @@ import itertools import logging from queue import Empty, PriorityQueue -from typing import Dict, Iterable, List, Set, Tuple +from typing import Collection, Dict, Iterable, List, Set, Tuple from synapse.api.errors import StoreError from synapse.events import EventBase @@ -26,7 +25,6 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor -from synapse.types import Collection from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 78245ad5..58453221 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 98dac19a..fd25c811 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. @@ -171,7 +170,7 @@ class PersistEventsStore: ) async with stream_ordering_manager as stream_orderings: - for (event, context), stream in zip(events_and_contexts, stream_orderings): + for (event, _), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream await self.db_pool.runInteraction( @@ -298,7 +297,7 @@ class PersistEventsStore: txn.execute(sql + clause, args) to_recursively_check = [] - for event_id, prev_event_id, metadata, rejected in txn: + for _, prev_event_id, metadata, rejected in txn: if prev_event_id in existing_prevs: continue @@ -320,8 +319,8 @@ class PersistEventsStore: txn: LoggingTransaction, events_and_contexts: List[Tuple[EventBase, EventContext]], backfilled: bool, - state_delta_for_room: Dict[str, DeltaState] = {}, - new_forward_extremeties: Dict[str, List[str]] = {}, + state_delta_for_room: Optional[Dict[str, DeltaState]] = None, + new_forward_extremeties: Optional[Dict[str, List[str]]] = None, ): """Insert some number of room events into the necessary database tables. @@ -342,6 +341,9 @@ class PersistEventsStore: extremities. """ + state_delta_for_room = state_delta_for_room or {} + new_forward_extremeties = new_forward_extremeties or {} + all_events_and_contexts = events_and_contexts min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering @@ -1125,7 +1127,7 @@ class PersistEventsStore: def _update_forward_extremities_txn( self, txn, new_forward_extremities, max_stream_order ): - for room_id, new_extrem in new_forward_extremities.items(): + for room_id in new_forward_extremities.keys(): self.db_pool.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) @@ -1376,24 +1378,28 @@ class PersistEventsStore: ], ) - for event, _ in events_and_contexts: - if not event.internal_metadata.is_redacted(): - # If we're persisting an unredacted event we go and ensure - # that we mark any redactions that reference this event as - # requiring censoring. - self.db_pool.simple_update_txn( - txn, - table="redactions", - keyvalues={"redacts": event.event_id}, - updatevalues={"have_censored": False}, + # If we're persisting an unredacted event we go and ensure + # that we mark any redactions that reference this event as + # requiring censoring. + sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?" + txn.execute_batch( + sql, + ( + ( + False, + event.event_id, ) + for event, _ in events_and_contexts + if not event.internal_metadata.is_redacted() + ), + ) state_events_and_contexts = [ ec for ec in events_and_contexts if ec[0].is_state() ] state_values = [] - for event, context in state_events_and_contexts: + for event, _ in state_events_and_contexts: vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -1462,7 +1468,7 @@ class PersistEventsStore: # nothing to do here return - for event, context in events_and_contexts: + for event, _ in events_and_contexts: if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the # redacted event. @@ -1879,20 +1885,28 @@ class PersistEventsStore: ), ) - for event, _ in events_and_contexts: - user_ids = self.db_pool.simple_select_onecol_txn( - txn, - table="event_push_actions_staging", - keyvalues={"event_id": event.event_id}, - retcol="user_id", - ) + room_to_event_ids = {} # type: Dict[str, List[str]] + for e, _ in events_and_contexts: + room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) - for uid in user_ids: - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid), + for room_id, event_ids in room_to_event_ids.items(): + rows = self.db_pool.simple_select_many_txn( + txn, + table="event_push_actions_staging", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("user_id",), ) + user_ids = {row["user_id"] for row in rows} + + for user_id in user_ids: + txn.call_after( + self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id), + ) + # Now we delete the staging area for *all* events that were being # persisted. txn.execute_batch( diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 78367ea5..cbe4be14 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -838,7 +837,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): # We want to do a `(topological_ordering, stream_ordering) > (?,?)` # comparison, but that is not supported on older SQLite versions tuple_clause, tuple_args = make_tuple_comparison_clause( - self.database_engine, [ ("events.room_id", last_room_id), ("topological_ordering", last_depth), diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index b3703ae1..6d2688d7 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 952d4969..2c823e09 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,7 +15,16 @@ import logging import threading from collections import namedtuple -from typing import Dict, Iterable, List, Optional, Tuple, overload +from typing import ( + Collection, + Container, + Dict, + Iterable, + List, + Optional, + Tuple, + overload, +) from constantly import NamedConstant, Names from typing_extensions import Literal @@ -46,7 +54,7 @@ from synapse.storage.database import DatabasePool from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.storage.util.sequence import build_sequence_generator -from synapse.types import Collection, JsonDict, get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter @@ -544,7 +552,7 @@ class EventsWorkerStore(SQLBaseStore): async def get_stripped_room_state_from_event_context( self, context: EventContext, - state_types_to_include: List[EventTypes], + state_types_to_include: Container[str], membership_user_id: Optional[str] = None, ) -> List[JsonDict]: """ diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index d2f5b9a5..bb244a03 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py index ac07e019..66ad363b 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2018 New Vector Ltd # @@ -1027,8 +1026,8 @@ class GroupServerStore(GroupServerWorkerStore): user_id: str, is_admin: bool = False, is_public: bool = True, - local_attestation: dict = None, - remote_attestation: dict = None, + local_attestation: Optional[dict] = None, + remote_attestation: Optional[dict] = None, ) -> None: """Add a user to the group server. @@ -1171,7 +1170,7 @@ class GroupServerStore(GroupServerWorkerStore): user_id: str, membership: str, is_admin: bool = False, - content: JsonDict = {}, + content: Optional[JsonDict] = None, local_attestation: Optional[dict] = None, remote_attestation: Optional[dict] = None, is_publicised: bool = False, @@ -1192,6 +1191,8 @@ class GroupServerStore(GroupServerWorkerStore): is_publicised: Whether this should be publicised. """ + content = content or {} + def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? self.db_pool.simple_delete_txn( diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index d504323b..0e868078 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd. # diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 4f3d1925..c5848681 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2020-2021 The Matrix.org Foundation C.I.C. # @@ -22,6 +21,9 @@ from synapse.storage.database import DatabasePool BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD = ( "media_repository_drop_index_wo_method" ) +BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = ( + "media_repository_drop_index_wo_method_2" +) class MediaSortOrder(Enum): @@ -85,23 +87,35 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): unique=True, ) + # the original impl of _drop_media_index_without_method was broken (see + # https://github.com/matrix-org/synapse/issues/8649), so we replace the original + # impl with a no-op and run the fixed migration as + # media_repository_drop_index_wo_method_2. + self.db_pool.updates.register_noop_background_update( + BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD + ) self.db_pool.updates.register_background_update_handler( - BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD, + BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2, self._drop_media_index_without_method, ) async def _drop_media_index_without_method(self, progress, batch_size): + """background update handler which removes the old constraints. + + Note that this is only run on postgres. + """ + def f(txn): txn.execute( "ALTER TABLE local_media_repository_thumbnails DROP CONSTRAINT IF EXISTS local_media_repository_thumbn_media_id_thumbnail_width_thum_key" ) txn.execute( - "ALTER TABLE remote_media_cache_thumbnails DROP CONSTRAINT IF EXISTS remote_media_repository_thumbn_media_id_thumbnail_width_thum_key" + "ALTER TABLE remote_media_cache_thumbnails DROP CONSTRAINT IF EXISTS remote_media_cache_thumbnails_media_origin_media_id_thumbna_key" ) await self.db_pool.runInteraction("drop_media_indices_without_method", f) await self.db_pool.updates._end_background_update( - BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD + BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 ) return 1 diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 614a418a..c3f551d3 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index 757da3d5..fe256382 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 0ff693a3..db22fab2 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,16 +12,69 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, List, Tuple -from synapse.api.presence import UserPresenceState +from synapse.api.presence import PresenceState, UserPresenceState +from synapse.replication.tcp.streams import PresenceStream from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause +from synapse.storage.database import DatabasePool +from synapse.storage.engines import PostgresEngine +from synapse.storage.types import Connection +from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.iterutils import batch_iter +if TYPE_CHECKING: + from synapse.server import HomeServer + class PresenceStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: Connection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._can_persist_presence = ( + hs.get_instance_name() in hs.config.worker.writers.presence + ) + + if isinstance(database.engine, PostgresEngine): + self._presence_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="presence_stream", + instance_name=self._instance_name, + tables=[("presence_stream", "instance_name", "stream_id")], + sequence_name="presence_stream_sequence", + writers=hs.config.worker.writers.to_device, + ) + else: + self._presence_id_gen = StreamIdGenerator( + db_conn, "presence_stream", "stream_id" + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( + db_conn, + "presence_stream", + entity_column="user_id", + stream_column="stream_id", + max_value=self._presence_id_gen.get_current_token(), + ) + self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", + min_presence_val, + prefilled_cache=presence_cache_prefill, + ) + async def update_presence(self, presence_states): + assert self._can_persist_presence + stream_ordering_manager = self._presence_id_gen.get_next_mult( len(presence_states) ) @@ -58,6 +110,7 @@ class PresenceStore(SQLBaseStore): "last_user_sync_ts": state.last_user_sync_ts, "status_msg": state.status_msg, "currently_active": state.currently_active, + "instance_name": self._instance_name, } for stream_id, state in zip(stream_orderings, presence_states) ], @@ -217,3 +270,37 @@ class PresenceStore(SQLBaseStore): def get_current_presence_token(self): return self._presence_id_gen.get_current_token() + + def _get_active_presence(self, db_conn: Connection): + """Fetch non-offline presence from the database so that we can register + the appropriate time outs. + """ + + sql = ( + "SELECT user_id, state, last_active_ts, last_federation_update_ts," + " last_user_sync_ts, status_msg, currently_active FROM presence_stream" + " WHERE state != ?" + ) + + txn = db_conn.cursor() + txn.execute(sql, (PresenceState.OFFLINE,)) + rows = self.db_pool.cursor_to_dict(txn) + txn.close() + + for row in rows: + row["currently_active"] = bool(row["currently_active"]) + + return [UserPresenceState(**row) for row in rows] + + def take_presence_startup_info(self): + active_on_startup = self._presence_on_startup + self._presence_on_startup = None + return active_on_startup + + def process_replication_rows(self, stream_name, instance_name, token, rows): + if stream_name == PresenceStream.NAME: + self._presence_id_gen.advance(instance_name, token) + for row in rows: + self.presence_stream_cache.entity_has_changed(row.user_id, token) + self._get_presence_for_user.invalidate((row.user_id,)) + return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index ba01d310..9b4e95e1 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 41f4fe7f..8f83748b 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 9e58dc0e..db521763 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index c65558c2..b48fe086 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 43c852c9..3647276a 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 90a8f664..6e5ee557 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017-2018 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. @@ -92,13 +91,25 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): id_column=None, ) - self._account_validity = hs.config.account_validity - if hs.config.run_background_tasks and self._account_validity.enabled: - self._clock.call_later( - 0.0, - self._set_expiration_date_when_missing, + self._account_validity_enabled = ( + hs.config.account_validity.account_validity_enabled + ) + self._account_validity_period = None + self._account_validity_startup_job_max_delta = None + if self._account_validity_enabled: + self._account_validity_period = ( + hs.config.account_validity.account_validity_period + ) + self._account_validity_startup_job_max_delta = ( + hs.config.account_validity.account_validity_startup_job_max_delta ) + if hs.config.run_background_tasks: + self._clock.call_later( + 0.0, + self._set_expiration_date_when_missing, + ) + # Create a background job for culling expired 3PID validity tokens if hs.config.run_background_tasks: self._clock.looping_call( @@ -195,6 +206,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): expiration_ts: int, email_sent: bool, renewal_token: Optional[str] = None, + token_used_ts: Optional[int] = None, ) -> None: """Updates the account validity properties of the given account, with the given values. @@ -208,6 +220,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): period. renewal_token: Renewal token the user can use to extend the validity of their account. Defaults to no token. + token_used_ts: A timestamp of when the current token was used to renew + the account. """ def set_account_validity_for_user_txn(txn): @@ -219,6 +233,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): "expiration_ts_ms": expiration_ts, "email_sent": email_sent, "renewal_token": renewal_token, + "token_used_ts_ms": token_used_ts, }, ) self._invalidate_cache_and_stream( @@ -232,7 +247,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): async def set_renewal_token_for_user( self, user_id: str, renewal_token: str ) -> None: - """Defines a renewal token for a given user. + """Defines a renewal token for a given user, and clears the token_used timestamp. Args: user_id: ID of the user to set the renewal token for. @@ -245,26 +260,40 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): await self.db_pool.simple_update_one( table="account_validity", keyvalues={"user_id": user_id}, - updatevalues={"renewal_token": renewal_token}, + updatevalues={"renewal_token": renewal_token, "token_used_ts_ms": None}, desc="set_renewal_token_for_user", ) - async def get_user_from_renewal_token(self, renewal_token: str) -> str: - """Get a user ID from a renewal token. + async def get_user_from_renewal_token( + self, renewal_token: str + ) -> Tuple[str, int, Optional[int]]: + """Get a user ID and renewal status from a renewal token. Args: renewal_token: The renewal token to perform the lookup with. Returns: - The ID of the user to which the token belongs. + A tuple of containing the following values: + * The ID of a user to which the token belongs. + * An int representing the user's expiry timestamp as milliseconds since the + epoch, or 0 if the token was invalid. + * An optional int representing the timestamp of when the user renewed their + account timestamp as milliseconds since the epoch. None if the account + has not been renewed using the current token yet. """ - return await self.db_pool.simple_select_one_onecol( + ret_dict = await self.db_pool.simple_select_one( table="account_validity", keyvalues={"renewal_token": renewal_token}, - retcol="user_id", + retcols=["user_id", "expiration_ts_ms", "token_used_ts_ms"], desc="get_user_from_renewal_token", ) + return ( + ret_dict["user_id"], + ret_dict["expiration_ts_ms"], + ret_dict["token_used_ts_ms"], + ) + async def get_renewal_token_for_user(self, user_id: str) -> str: """Get the renewal token associated with a given user ID. @@ -303,7 +332,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): "get_users_expiring_soon", select_users_txn, self._clock.time_msec(), - self.config.account_validity.renew_at, + self.config.account_validity_renew_at, ) async def set_renewal_mail_status(self, user_id: str, email_sent: bool) -> None: @@ -965,11 +994,11 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): delta equal to 10% of the validity period. """ now_ms = self._clock.time_msec() - expiration_ts = now_ms + self._account_validity.period + expiration_ts = now_ms + self._account_validity_period if use_delta: expiration_ts = self.rand.randrange( - expiration_ts - self._account_validity.startup_job_max_delta, + expiration_ts - self._account_validity_startup_job_max_delta, expiration_ts, ) @@ -1413,7 +1442,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): except self.database_engine.module.IntegrityError: raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE) - if self._account_validity.enabled: + if self._account_validity_enabled: self.set_expiration_date_for_user_txn(txn, user_id) if create_profile_with_displayname: diff --git a/synapse/storage/databases/main/rejections.py b/synapse/storage/databases/main/rejections.py index 1e361aaa..167318b3 100644 --- a/synapse/storage/databases/main/rejections.py +++ b/synapse/storage/databases/main/rejections.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 5cd61547..2bbf6d6a 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 9cbcd530..5f38634f 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. # @@ -521,13 +520,11 @@ class RoomWorkerStore(SQLBaseStore): ) @cached(max_entries=10000) - async def get_ratelimit_for_user(self, user_id): - """Check if there are any overrides for ratelimiting for the given - user + async def get_ratelimit_for_user(self, user_id: str) -> Optional[RatelimitOverride]: + """Check if there are any overrides for ratelimiting for the given user Args: - user_id (str) - + user_id: user ID of the user Returns: RatelimitOverride if there is an override, else None. If the contents of RatelimitOverride are None or 0 then ratelimitng has been @@ -549,6 +546,62 @@ class RoomWorkerStore(SQLBaseStore): else: return None + async def set_ratelimit_for_user( + self, user_id: str, messages_per_second: int, burst_count: int + ) -> None: + """Sets whether a user is set an overridden ratelimit. + Args: + user_id: user ID of the user + messages_per_second: The number of actions that can be performed in a second. + burst_count: How many actions that can be performed before being limited. + """ + + def set_ratelimit_txn(txn): + self.db_pool.simple_upsert_txn( + txn, + table="ratelimit_override", + keyvalues={"user_id": user_id}, + values={ + "messages_per_second": messages_per_second, + "burst_count": burst_count, + }, + ) + + self._invalidate_cache_and_stream( + txn, self.get_ratelimit_for_user, (user_id,) + ) + + await self.db_pool.runInteraction("set_ratelimit", set_ratelimit_txn) + + async def delete_ratelimit_for_user(self, user_id: str) -> None: + """Delete an overridden ratelimit for a user. + Args: + user_id: user ID of the user + """ + + def delete_ratelimit_txn(txn): + row = self.db_pool.simple_select_one_txn( + txn, + table="ratelimit_override", + keyvalues={"user_id": user_id}, + retcols=["user_id"], + allow_none=True, + ) + + if not row: + return + + # They are there, delete them. + self.db_pool.simple_delete_one_txn( + txn, "ratelimit_override", keyvalues={"user_id": user_id} + ) + + self._invalidate_cache_and_stream( + txn, self.get_ratelimit_for_user, (user_id,) + ) + + await self.db_pool.runInteraction("delete_ratelimit", delete_ratelimit_txn) + @cached() async def get_retention_policy_for_room(self, room_id): """Get the retention policy for a given room. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index a9216ca9..2a8532f8 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # @@ -14,7 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) + +import attr from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase @@ -34,7 +46,7 @@ from synapse.storage.roommember import ( ProfileInfo, RoomsForUser, ) -from synapse.types import Collection, PersistedEventPosition, get_domain_from_id +from synapse.types import PersistedEventPosition, StateMap, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList @@ -54,6 +66,10 @@ class RoomMemberWorkerStore(EventsWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) + # Used by `_get_joined_hosts` to ensure only one thing mutates the cache + # at a time. Keyed by room_id. + self._joined_host_linearizer = Linearizer("_JoinedHostsCache") + # Is the current_state_events.membership up to date? Or is the # background update still running? self._current_state_events_membership_up_to_date = False @@ -174,6 +190,33 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (room_id, Membership.JOIN)) return [r[0] for r in txn] + @cached(max_entries=100000, iterable=True) + async def get_users_in_room_with_profiles( + self, room_id: str + ) -> Dict[str, ProfileInfo]: + """Get a mapping from user ID to profile information for all users in a given room. + + Args: + room_id: The ID of the room to retrieve the users of. + + Returns: + A mapping from user ID to ProfileInfo. + """ + + def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]: + sql = """ + SELECT user_id, display_name, avatar_url FROM room_memberships + WHERE room_id = ? AND membership = ? + """ + txn.execute(sql, (room_id, Membership.JOIN)) + + return {r[0]: ProfileInfo(display_name=r[1], avatar_url=r[2]) for r in txn} + + return await self.db_pool.runInteraction( + "get_users_in_room_with_profiles", + _get_users_in_room_with_profiles, + ) + @cached(max_entries=100000) async def get_room_summary(self, room_id: str) -> Dict[str, MemberSummary]: """Get the details of a room roughly suitable for use by the room @@ -704,19 +747,82 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(num_args=2, max_entries=10000, iterable=True) async def _get_joined_hosts( - self, room_id, state_group, current_state_ids, state_entry - ): - # We don't use `state_group`, its there so that we can cache based - # on it. However, its important that its never None, since two current_state's - # with a state_group of None are likely to be different. + self, + room_id: str, + state_group: int, + current_state_ids: StateMap[str], + state_entry: "_StateCacheEntry", + ) -> FrozenSet[str]: + # We don't use `state_group`, its there so that we can cache based on + # it. However, its important that its never None, since two + # current_state's with a state_group of None are likely to be different. + # + # The `state_group` must match the `state_entry.state_group` (if not None). assert state_group is not None - + assert state_entry.state_group is None or state_entry.state_group == state_group + + # We use a secondary cache of previous work to allow us to build up the + # joined hosts for the given state group based on previous state groups. + # + # We cache one object per room containing the results of the last state + # group we got joined hosts for. The idea is that generally + # `get_joined_hosts` is called with the "current" state group for the + # room, and so consecutive calls will be for consecutive state groups + # which point to the previous state group. cache = await self._get_joined_hosts_cache(room_id) - return await cache.get_destinations(state_entry) + + # If the state group in the cache matches, we already have the data we need. + if state_entry.state_group == cache.state_group: + return frozenset(cache.hosts_to_joined_users) + + # Since we'll mutate the cache we need to lock. + with (await self._joined_host_linearizer.queue(room_id)): + if state_entry.state_group == cache.state_group: + # Same state group, so nothing to do. We've already checked for + # this above, but the cache may have changed while waiting on + # the lock. + pass + elif state_entry.prev_group == cache.state_group: + # The cached work is for the previous state group, so we work out + # the delta. + for (typ, state_key), event_id in state_entry.delta_ids.items(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = cache.hosts_to_joined_users.setdefault(host, set()) + + event = await self.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + cache.hosts_to_joined_users.pop(host, None) + else: + # The cache doesn't match the state group or prev state group, + # so we calculate the result from first principles. + joined_users = await self.get_joined_users_from_state( + room_id, state_entry + ) + + cache.hosts_to_joined_users = {} + for user_id in joined_users: + host = intern_string(get_domain_from_id(user_id)) + cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + if state_entry.state_group: + cache.state_group = state_entry.state_group + else: + cache.state_group = object() + + return frozenset(cache.hosts_to_joined_users) @cached(max_entries=10000) def _get_joined_hosts_cache(self, room_id: str) -> "_JoinedHostsCache": - return _JoinedHostsCache(self, room_id) + return _JoinedHostsCache() @cached(num_args=2) async def did_forget(self, user_id: str, room_id: str) -> bool: @@ -1026,71 +1132,18 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): await self.db_pool.runInteraction("forget_membership", f) +@attr.s(slots=True) class _JoinedHostsCache: - """Cache for joined hosts in a room that is optimised to handle updates - via state deltas. - """ - - def __init__(self, store, room_id): - self.store = store - self.room_id = room_id - - self.hosts_to_joined_users = {} - - self.state_group = object() + """The cached data used by the `_get_joined_hosts_cache`.""" - self.linearizer = Linearizer("_JoinedHostsCache") + # Dict of host to the set of their users in the room at the state group. + hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict) - self._len = 0 - - async def get_destinations(self, state_entry: "_StateCacheEntry") -> Set[str]: - """Get set of destinations for a state entry - - Args: - state_entry - - Returns: - The destinations as a set. - """ - if state_entry.state_group == self.state_group: - return frozenset(self.hosts_to_joined_users) - - with (await self.linearizer.queue(())): - if state_entry.state_group == self.state_group: - pass - elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in state_entry.delta_ids.items(): - if typ != EventTypes.Member: - continue - - host = intern_string(get_domain_from_id(state_key)) - user_id = state_key - known_joins = self.hosts_to_joined_users.setdefault(host, set()) - - event = await self.store.get_event(event_id) - if event.membership == Membership.JOIN: - known_joins.add(user_id) - else: - known_joins.discard(user_id) - - if not known_joins: - self.hosts_to_joined_users.pop(host, None) - else: - joined_users = await self.store.get_joined_users_from_state( - self.room_id, state_entry - ) - - self.hosts_to_joined_users = {} - for user_id in joined_users: - host = intern_string(get_domain_from_id(user_id)) - self.hosts_to_joined_users.setdefault(host, set()).add(user_id) - - if state_entry.state_group: - self.state_group = state_entry.state_group - else: - self.state_group = object() - self._len = sum(len(v) for v in self.hosts_to_joined_users.values()) - return frozenset(self.hosts_to_joined_users) + # The state group `hosts_to_joined_users` is derived from. Will be an object + # if the instance is newly created or if the state is not based on a state + # group. (An object is used as a sentinel value to ensure that it never is + # equal to anything else). + state_group = attr.ib(type=Union[object, int], factory=object) def __len__(self): - return self._len + return sum(len(v) for v in self.hosts_to_joined_users.values()) diff --git a/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py b/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py index b1684a84..acd6ad1e 100644 --- a/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py +++ b/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py index 44917f0a..66989222 100644 --- a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py +++ b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/schema/delta/59/11drop_thumbnail_constraint.sql.postgres b/synapse/storage/databases/main/schema/delta/59/11drop_thumbnail_constraint.sql.postgres new file mode 100644 index 00000000..54c1bca3 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/11drop_thumbnail_constraint.sql.postgres @@ -0,0 +1,22 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- drop old constraints on remote_media_cache_thumbnails +-- +-- This was originally part of 57.07, but it was done wrong, per +-- https://github.com/matrix-org/synapse/issues/8649, so we do it again. +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + (5911, 'media_repository_drop_index_wo_method_2', '{}', 'remote_media_repository_thumbnails_method_idx'); + diff --git a/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql new file mode 100644 index 00000000..4836dac1 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Track when users renew their account using the value of the 'renewal_token' column. +-- This field should be set to NULL after a fresh token is generated. +ALTER TABLE account_validity ADD token_used_ts_ms BIGINT; diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql new file mode 100644 index 00000000..b6ba0bda --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a column to specify which instance wrote the row. Historic rows have +-- `NULL`, which indicates that the master instance wrote them. +ALTER TABLE presence_stream ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres new file mode 100644 index 00000000..02b182ad --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres @@ -0,0 +1,20 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS presence_stream_sequence; + +SELECT setval('presence_stream_sequence', ( + SELECT COALESCE(MAX(stream_id), 1) FROM presence_stream +)); diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index f5e7d9ef..6480d5a9 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,7 +15,7 @@ import logging import re from collections import namedtuple -from typing import List, Optional, Set +from typing import Collection, List, Optional, Set from synapse.api.errors import SynapseError from synapse.events import EventBase @@ -24,7 +23,6 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import Collection logger = logging.getLogger(__name__) diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py index c8c67953..ab2159c2 100644 --- a/synapse/storage/databases/main/signatures.py +++ b/synapse/storage/databases/main/signatures.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index a7f37173..1757064a 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2020 The Matrix.org Foundation C.I.C. # @@ -190,7 +189,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # FIXME: how should this be cached? async def get_filtered_current_state_ids( - self, room_id: str, state_filter: StateFilter = StateFilter.all() + self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: """Get the current state event of a given type for a room based on the current_state_events table. This may not be as up-to-date as the result @@ -205,7 +204,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): Map from type/state_key to event ID. """ - where_clause, where_args = state_filter.make_sql_filter_clause() + where_clause, where_args = ( + state_filter or StateFilter.all() + ).make_sql_filter_clause() if not where_clause: # We delegate to the cached version diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index 0dbb501f..bff7d040 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index 1c99393c..ae9f8809 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. # @@ -66,18 +65,37 @@ TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")} class UserSortOrder(Enum): """ Enum to define the sorting method used when returning users - with get_users_media_usage_paginate + with get_users_paginate in __init__.py + and get_users_media_usage_paginate in stats.py - MEDIA_LENGTH = ordered by size of uploaded media. Smallest to largest. - MEDIA_COUNT = ordered by number of uploaded media. Smallest to largest. + When moves this to __init__.py gets `builtins.ImportError` with + `most likely due to a circular import` + + MEDIA_LENGTH = ordered by size of uploaded media. + MEDIA_COUNT = ordered by number of uploaded media. USER_ID = ordered alphabetically by `user_id`. + NAME = ordered alphabetically by `user_id`. This is for compatibility reasons, + as the user_id is returned in the name field in the response in list users admin API. DISPLAYNAME = ordered alphabetically by `displayname` + GUEST = ordered by `is_guest` + ADMIN = ordered by `admin` + DEACTIVATED = ordered by `deactivated` + USER_TYPE = ordered alphabetically by `user_type` + AVATAR_URL = ordered alphabetically by `avatar_url` + SHADOW_BANNED = ordered by `shadow_banned` """ MEDIA_LENGTH = "media_length" MEDIA_COUNT = "media_count" USER_ID = "user_id" + NAME = "name" DISPLAYNAME = "displayname" + GUEST = "is_guest" + ADMIN = "admin" + DEACTIVATED = "deactivated" + USER_TYPE = "user_type" + AVATAR_URL = "avatar_url" + SHADOW_BANNED = "shadow_banned" class StatsStore(StateDeltasStore): diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 91f8abb6..7581c7d3 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd # Copyright 2018-2019 New Vector Ltd @@ -38,7 +37,7 @@ what sort order was used: import abc import logging from collections import namedtuple -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple from twisted.internet import defer @@ -54,7 +53,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import Collection, PersistedEventPosition, RoomStreamToken +from synapse.types import PersistedEventPosition, RoomStreamToken from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index 50067eab..1d62c614 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index b7072f1f..82335e7a 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py index 5473ec14..22c05cdd 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 1026f321..7a082fdd 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py index f9575b1f..acf6b2fb 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/state/__init__.py b/synapse/storage/databases/state/__init__.py index c90d0228..e5100d61 100644 --- a/synapse/storage/databases/state/__init__.py +++ b/synapse/storage/databases/state/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 1fd333b7..c2891cb0 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,6 +13,7 @@ # limitations under the License. import logging +from typing import Optional from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool @@ -73,8 +73,10 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): return count def _get_state_groups_from_groups_txn( - self, txn, groups, state_filter=StateFilter.all() + self, txn, groups, state_filter: Optional[StateFilter] = None ): + state_filter = state_filter or StateFilter.all() + results = {group: {} for group in groups} where_clause, where_args = state_filter.make_sql_filter_clause() diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 97ec65f7..e38461ad 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,7 +14,7 @@ import logging from collections import namedtuple -from typing import Dict, Iterable, List, Set, Tuple +from typing import Dict, Iterable, List, Optional, Set, Tuple from synapse.api.constants import EventTypes from synapse.storage._base import SQLBaseStore @@ -210,7 +209,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state_filter.filter_state(state_dict_ids), not missing_types async def _get_state_for_groups( - self, groups: Iterable[int], state_filter: StateFilter = StateFilter.all() + self, groups: Iterable[int], state_filter: Optional[StateFilter] = None ) -> Dict[int, MutableStateMap[str]]: """Gets the state at each of a list of state groups, optionally filtering by type/state_key @@ -223,6 +222,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): Returns: Dict of state group to state map. """ + state_filter = state_filter or StateFilter.all() member_filter, non_member_filter = state_filter.get_member_split() diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index d15ccfac..9abc0204 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index cca839c7..1882bfd9 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -44,14 +43,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): @property @abc.abstractmethod - def supports_tuple_comparison(self) -> bool: - """ - Do we support comparing tuples, i.e. `(a, b) > (c, d)`? - """ - ... - - @property - @abc.abstractmethod def supports_using_any_list(self) -> bool: """ Do we support using `a = ANY(?)` and passing a list diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 80a3558a..21411c5f 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -47,8 +46,8 @@ class PostgresEngine(BaseDatabaseEngine): self._version = db_conn.server_version # Are we on a supported PostgreSQL version? - if not allow_outdated_version and self._version < 90500: - raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.") + if not allow_outdated_version and self._version < 90600: + raise RuntimeError("Synapse requires PostgreSQL 9.6 or above.") with db_conn.cursor() as txn: txn.execute("SHOW SERVER_ENCODING") @@ -130,13 +129,6 @@ class PostgresEngine(BaseDatabaseEngine): return True @property - def supports_tuple_comparison(self): - """ - Do we support comparing tuples, i.e. `(a, b) > (c, d)`? - """ - return True - - @property def supports_using_any_list(self): """Do we support using `a = ANY(?)` and passing a list""" return True diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index b87e7798..5fe1b205 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -57,14 +56,6 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): return self.module.sqlite_version_info >= (3, 24, 0) @property - def supports_tuple_comparison(self): - """ - Do we support comparing tuples, i.e. `(a, b) > (c, d)`? This requires - SQLite 3.15+. - """ - return self.module.sqlite_version_info >= (3, 15, 0) - - @property def supports_using_any_list(self): """Do we support using `a = ANY(?)` and passing a list""" return False @@ -72,8 +63,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): def check_database(self, db_conn, allow_outdated_version: bool = False): if not allow_outdated_version: version = self.module.sqlite_version_info - if version < (3, 11, 0): - raise RuntimeError("Synapse requires sqlite 3.11 or above.") + # Synapse is untested against older SQLite versions, and we don't want + # to let users upgrade to a version of Synapse with broken support for their + # sqlite version, because it risks leaving them with a half-upgraded db. + if version < (3, 22, 0): + raise RuntimeError("Synapse requires sqlite 3.22 or above.") def check_new_database(self, txn): """Gets called when setting up a brand new database. This allows us to diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index c03871f3..540adb87 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd. # diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 3a0d6fb3..33dc752d 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. @@ -18,7 +17,7 @@ import itertools import logging from collections import deque, namedtuple -from typing import Dict, Iterable, List, Optional, Set, Tuple +from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple from prometheus_client import Counter, Histogram @@ -33,7 +32,6 @@ from synapse.storage.databases import Databases from synapse.storage.databases.main.events import DeltaState from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( - Collection, PersistedEventPosition, RoomStreamToken, StateMap, diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6c3c2da5..7a2cbee4 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # @@ -13,12 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import imp +import importlib.util import logging import os import re from collections import Counter -from typing import Generator, Iterable, List, Optional, TextIO, Tuple +from typing import Collection, Generator, Iterable, List, Optional, TextIO, Tuple import attr from typing_extensions import Counter as CounterType @@ -28,7 +27,6 @@ from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines.postgres import PostgresEngine from synapse.storage.types import Cursor -from synapse.types import Collection logger = logging.getLogger(__name__) @@ -454,8 +452,13 @@ def _upgrade_existing_database( ) module_name = "synapse.storage.v%d_%s" % (v, root_name) - with open(absolute_path) as python_file: - module = imp.load_source(module_name, absolute_path, python_file) # type: ignore + + spec = importlib.util.spec_from_file_location( + module_name, absolute_path + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) # type: ignore + logger.info("Running script %s", relative_path) module.run_create(cur, database_engine) # type: ignore if not is_empty: diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py index ad954990..30669beb 100644 --- a/synapse/storage/purge_events.py +++ b/synapse/storage/purge_events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index f47cec0d..2d5c21ef 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index 2564f34b..c552dbf0 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index d2ff4da6..c34fbf21 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2e277a21..cfafba22 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -449,7 +448,7 @@ class StateGroupStorage: return self.stores.state._get_state_groups_from_groups(groups, state_filter) async def get_state_for_events( - self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all() + self, event_ids: Iterable[str], state_filter: Optional[StateFilter] = None ) -> Dict[str, StateMap[EventBase]]: """Given a list of event_ids and type tuples, return a list of state dicts for each event. @@ -465,7 +464,7 @@ class StateGroupStorage: groups = set(event_to_groups.values()) group_to_state = await self.stores.state._get_state_for_groups( - groups, state_filter + groups, state_filter or StateFilter.all() ) state_event_map = await self.stores.main.get_events( @@ -485,7 +484,7 @@ class StateGroupStorage: return {event: event_to_state[event] for event in event_ids} async def get_state_ids_for_events( - self, event_ids: Iterable[str], state_filter: StateFilter = StateFilter.all() + self, event_ids: Iterable[str], state_filter: Optional[StateFilter] = None ) -> Dict[str, StateMap[str]]: """ Get the state dicts corresponding to a list of events, containing the event_ids @@ -502,7 +501,7 @@ class StateGroupStorage: groups = set(event_to_groups.values()) group_to_state = await self.stores.state._get_state_for_groups( - groups, state_filter + groups, state_filter or StateFilter.all() ) event_to_state = { @@ -513,7 +512,7 @@ class StateGroupStorage: return {event: event_to_state[event] for event in event_ids} async def get_state_for_event( - self, event_id: str, state_filter: StateFilter = StateFilter.all() + self, event_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: """ Get the state dict corresponding to a particular event @@ -525,11 +524,13 @@ class StateGroupStorage: Returns: A dict from (type, state_key) -> state_event """ - state_map = await self.get_state_for_events([event_id], state_filter) + state_map = await self.get_state_for_events( + [event_id], state_filter or StateFilter.all() + ) return state_map[event_id] async def get_state_ids_for_event( - self, event_id: str, state_filter: StateFilter = StateFilter.all() + self, event_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: """ Get the state dict corresponding to a particular event @@ -541,11 +542,13 @@ class StateGroupStorage: Returns: A dict from (type, state_key) -> state_event """ - state_map = await self.get_state_ids_for_events([event_id], state_filter) + state_map = await self.get_state_ids_for_events( + [event_id], state_filter or StateFilter.all() + ) return state_map[event_id] def _get_state_for_groups( - self, groups: Iterable[int], state_filter: StateFilter = StateFilter.all() + self, groups: Iterable[int], state_filter: Optional[StateFilter] = None ) -> Awaitable[Dict[int, MutableStateMap[str]]]: """Gets the state at each of a list of state groups, optionally filtering by type/state_key @@ -558,7 +561,9 @@ class StateGroupStorage: Returns: Dict of state group to state map. """ - return self.stores.state._get_state_for_groups(groups, state_filter) + return self.stores.state._get_state_for_groups( + groups, state_filter or StateFilter.all() + ) async def store_state_group( self, diff --git a/synapse/storage/types.py b/synapse/storage/types.py index 17291c9d..57f4883b 100644 --- a/synapse/storage/types.py +++ b/synapse/storage/types.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py index bfebb0f6..5e83dba2 100644 --- a/synapse/storage/util/__init__.py +++ b/synapse/storage/util/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index d4643c4f..b1bd3a52 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,7 +16,7 @@ import logging import threading from collections import OrderedDict from contextlib import contextmanager -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Dict, Iterable, List, Optional, Set, Tuple, Union import attr @@ -91,7 +90,14 @@ class StreamIdGenerator: # ... persist event ... """ - def __init__(self, db_conn, table, column, extra_tables=[], step=1): + def __init__( + self, + db_conn, + table, + column, + extra_tables: Iterable[Tuple[str, str]] = (), + step=1, + ): assert step != 0 self._lock = threading.Lock() self._step = step diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 36a67e70..30b6b8e0 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); |