summaryrefslogtreecommitdiff
path: root/synapse/storage
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2020-05-25 11:54:03 +0200
committerAndrej Shadura <andrewsh@debian.org>2020-05-25 11:54:03 +0200
commitda7f96aa2a3b1485dafa016f38aac1d4376b64e7 (patch)
tree2c4cdc1096370cff409af72b9e97d3edf5f84c59 /synapse/storage
parentbef77b10b264d8c082a3604656e186ab162e5d64 (diff)
New upstream version 1.13.0
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/background_updates.py114
-rw-r--r--synapse/storage/data_stores/main/__init__.py75
-rw-r--r--synapse/storage/data_stores/main/cache.py44
-rw-r--r--synapse/storage/data_stores/main/client_ips.py16
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py88
-rw-r--r--synapse/storage/data_stores/main/devices.py300
-rw-r--r--synapse/storage/data_stores/main/directory.py26
-rw-r--r--synapse/storage/data_stores/main/e2e_room_keys.py3
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py14
-rw-r--r--synapse/storage/data_stores/main/event_federation.py23
-rw-r--r--synapse/storage/data_stores/main/events.py114
-rw-r--r--synapse/storage/data_stores/main/events_worker.py200
-rw-r--r--synapse/storage/data_stores/main/media_repository.py8
-rw-r--r--synapse/storage/data_stores/main/presence.py23
-rw-r--r--synapse/storage/data_stores/main/push_rule.py41
-rw-r--r--synapse/storage/data_stores/main/registration.py5
-rw-r--r--synapse/storage/data_stores/main/room.py118
-rw-r--r--synapse/storage/data_stores/main/roommember.py3
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql6
-rw-r--r--synapse/storage/data_stores/main/schema/delta/57/remove_sent_outbound_pokes.sql21
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/02remove_dup_outbound_pokes.sql22
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql36
-rw-r--r--synapse/storage/data_stores/main/stream.py2
-rw-r--r--synapse/storage/data_stores/main/ui_auth.py300
-rw-r--r--synapse/storage/database.py98
-rw-r--r--synapse/storage/engines/sqlite.py1
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/schema/delta/58/00background_update_ordering.sql19
28 files changed, 1266 insertions, 456 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index eb1a7e50..59f3394b 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -90,8 +90,10 @@ class BackgroundUpdater(object):
self._clock = hs.get_clock()
self.db = database
+ # if a background update is currently running, its name.
+ self._current_background_update = None # type: Optional[str]
+
self._background_update_performance = {}
- self._background_update_queue = []
self._background_update_handlers = {}
self._all_done = False
@@ -111,7 +113,7 @@ class BackgroundUpdater(object):
except Exception:
logger.exception("Error doing update")
else:
- if result is None:
+ if result:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
@@ -119,26 +121,25 @@ class BackgroundUpdater(object):
self._all_done = True
return None
- @defer.inlineCallbacks
- def has_completed_background_updates(self):
+ async def has_completed_background_updates(self) -> bool:
"""Check if all the background updates have completed
Returns:
- Deferred[bool]: True if all background updates have completed
+ True if all background updates have completed
"""
# if we've previously determined that there is nothing left to do, that
# is easy
if self._all_done:
return True
- # obviously, if we have things in our queue, we're not done.
- if self._background_update_queue:
+ # obviously, if we are currently processing an update, we're not done.
+ if self._current_background_update:
return False
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
- updates = yield self.db.simple_select_onecol(
+ updates = await self.db.simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
@@ -153,11 +154,10 @@ class BackgroundUpdater(object):
async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running.
"""
-
if self._all_done:
return True
- if update_name in self._background_update_queue:
+ if update_name == self._current_background_update:
return False
update_exists = await self.db.simple_select_one_onecol(
@@ -170,9 +170,7 @@ class BackgroundUpdater(object):
return not update_exists
- async def do_next_background_update(
- self, desired_duration_ms: float
- ) -> Optional[int]:
+ async def do_next_background_update(self, desired_duration_ms: float) -> bool:
"""Does some amount of work on the next queued background update
Returns once some amount of work is done.
@@ -181,33 +179,51 @@ class BackgroundUpdater(object):
desired_duration_ms(float): How long we want to spend
updating.
Returns:
- None if there is no more work to do, otherwise an int
+ True if we have finished running all the background updates, otherwise False
"""
- if not self._background_update_queue:
- updates = await self.db.simple_select_list(
- "background_updates",
- keyvalues=None,
- retcols=("update_name", "depends_on"),
+
+ def get_background_updates_txn(txn):
+ txn.execute(
+ """
+ SELECT update_name, depends_on FROM background_updates
+ ORDER BY ordering, update_name
+ """
)
- in_flight = {update["update_name"] for update in updates}
- for update in updates:
- if update["depends_on"] not in in_flight:
- self._background_update_queue.append(update["update_name"])
+ return self.db.cursor_to_dict(txn)
- if not self._background_update_queue:
- # no work left to do
- return None
+ if not self._current_background_update:
+ all_pending_updates = await self.db.runInteraction(
+ "background_updates", get_background_updates_txn,
+ )
+ if not all_pending_updates:
+ # no work left to do
+ return True
+
+ # find the first update which isn't dependent on another one in the queue.
+ pending = {update["update_name"] for update in all_pending_updates}
+ for upd in all_pending_updates:
+ depends_on = upd["depends_on"]
+ if not depends_on or depends_on not in pending:
+ break
+ logger.info(
+ "Not starting on bg update %s until %s is done",
+ upd["update_name"],
+ depends_on,
+ )
+ else:
+ # if we get to the end of that for loop, there is a problem
+ raise Exception(
+ "Unable to find a background update which doesn't depend on "
+ "another: dependency cycle?"
+ )
- # pop from the front, and add back to the back
- update_name = self._background_update_queue.pop(0)
- self._background_update_queue.append(update_name)
+ self._current_background_update = upd["update_name"]
- res = await self._do_background_update(update_name, desired_duration_ms)
- return res
+ await self._do_background_update(desired_duration_ms)
+ return False
- async def _do_background_update(
- self, update_name: str, desired_duration_ms: float
- ) -> int:
+ async def _do_background_update(self, desired_duration_ms: float) -> int:
+ update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
@@ -400,27 +416,6 @@ class BackgroundUpdater(object):
self.register_background_update_handler(update_name, updater)
- def start_background_update(self, update_name, progress):
- """Starts a background update running.
-
- Args:
- update_name: The update to set running.
- progress: The initial state of the progress of the update.
-
- Returns:
- A deferred that completes once the task has been added to the
- queue.
- """
- # Clear the background update queue so that we will pick up the new
- # task on the next iteration of do_background_update.
- self._background_update_queue = []
- progress_json = json.dumps(progress)
-
- return self.db.simple_insert(
- "background_updates",
- {"update_name": update_name, "progress_json": progress_json},
- )
-
def _end_background_update(self, update_name):
"""Removes a completed background update task from the queue.
@@ -429,9 +424,12 @@ class BackgroundUpdater(object):
Returns:
A deferred that completes once the task is removed.
"""
- self._background_update_queue = [
- name for name in self._background_update_queue if name != update_name
- ]
+ if update_name != self._current_background_update:
+ raise Exception(
+ "Cannot end background update %s which isn't currently running"
+ % update_name
+ )
+ self._current_background_update = None
return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py
index acca079f..ceba1088 100644
--- a/synapse/storage/data_stores/main/__init__.py
+++ b/synapse/storage/data_stores/main/__init__.py
@@ -66,6 +66,7 @@ from .stats import StatsStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionStore
+from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
from .user_erasure_store import UserErasureStore
@@ -112,6 +113,7 @@ class DataStore(
StatsStore,
RelationsStore,
CacheInvalidationStore,
+ UIAuthStore,
):
def __init__(self, database: Database, db_conn, hs):
self.hs = hs
@@ -144,7 +146,10 @@ class DataStore(
db_conn,
"device_lists_stream",
"stream_id",
- extra_tables=[("user_signature_stream", "stream_id")],
+ extra_tables=[
+ ("user_signature_stream", "stream_id"),
+ ("device_lists_outbound_pokes", "stream_id"),
+ ],
)
self._cross_signing_id_gen = StreamIdGenerator(
db_conn, "e2e_cross_signing_keys", "stream_id"
@@ -500,7 +505,8 @@ class DataStore(
self, start, limit, name=None, guests=True, deactivated=False
):
"""Function to retrieve a paginated list of users from
- users list. This will return a json list of users.
+ users list. This will return a json list of users and the
+ total number of users matching the filter criteria.
Args:
start (int): start number to begin the query from
@@ -509,35 +515,44 @@ class DataStore(
guests (bool): whether to in include guest users
deactivated (bool): whether to include deactivated users
Returns:
- defer.Deferred: resolves to list[dict[str, Any]]
+ defer.Deferred: resolves to list[dict[str, Any]], int
"""
- name_filter = {}
- if name:
- name_filter["name"] = "%" + name + "%"
-
- attr_filter = {}
- if not guests:
- attr_filter["is_guest"] = 0
- if not deactivated:
- attr_filter["deactivated"] = 0
-
- return self.db.simple_select_list_paginate(
- desc="get_users_paginate",
- table="users",
- orderby="name",
- start=start,
- limit=limit,
- filters=name_filter,
- keyvalues=attr_filter,
- retcols=[
- "name",
- "password_hash",
- "is_guest",
- "admin",
- "user_type",
- "deactivated",
- ],
- )
+
+ def get_users_paginate_txn(txn):
+ filters = []
+ args = []
+
+ if name:
+ filters.append("name LIKE ?")
+ args.append("%" + name + "%")
+
+ if not guests:
+ filters.append("is_guest = 0")
+
+ if not deactivated:
+ filters.append("deactivated = 0")
+
+ where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""
+
+ sql = "SELECT COUNT(*) as total_users FROM users %s" % (where_clause)
+ txn.execute(sql, args)
+ count = txn.fetchone()[0]
+
+ args = [self.hs.config.server_name] + args + [limit, start]
+ sql = """
+ SELECT name, user_type, is_guest, admin, deactivated, displayname, avatar_url
+ FROM users as u
+ LEFT JOIN profiles AS p ON u.name = '@' || p.user_id || ':' || ?
+ {}
+ ORDER BY u.name LIMIT ? OFFSET ?
+ """.format(
+ where_clause
+ )
+ txn.execute(sql, args)
+ users = self.db.cursor_to_dict(txn)
+ return users, count
+
+ return self.db.runInteraction("get_users_paginate_txn", get_users_paginate_txn)
def search_users(self, term):
"""Function to search users list for one or more users with
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py
index d4c44dcc..4dc5da3f 100644
--- a/synapse/storage/data_stores/main/cache.py
+++ b/synapse/storage/data_stores/main/cache.py
@@ -32,7 +32,29 @@ logger = logging.getLogger(__name__)
CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
-class CacheInvalidationStore(SQLBaseStore):
+class CacheInvalidationWorkerStore(SQLBaseStore):
+ def get_all_updated_caches(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed([])
+
+ def get_all_updated_caches_txn(txn):
+ # We purposefully don't bound by the current token, as we want to
+ # send across cache invalidations as quickly as possible. Cache
+ # invalidations are idempotent, so duplicates are fine.
+ sql = (
+ "SELECT stream_id, cache_func, keys, invalidation_ts"
+ " FROM cache_invalidation_stream"
+ " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
+ )
+ txn.execute(sql, (last_id, limit))
+ return txn.fetchall()
+
+ return self.db.runInteraction(
+ "get_all_updated_caches", get_all_updated_caches_txn
+ )
+
+
+class CacheInvalidationStore(CacheInvalidationWorkerStore):
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
@@ -145,26 +167,6 @@ class CacheInvalidationStore(SQLBaseStore):
},
)
- def get_all_updated_caches(self, last_id, current_id, limit):
- if last_id == current_id:
- return defer.succeed([])
-
- def get_all_updated_caches_txn(txn):
- # We purposefully don't bound by the current token, as we want to
- # send across cache invalidations as quickly as possible. Cache
- # invalidations are idempotent, so duplicates are fine.
- sql = (
- "SELECT stream_id, cache_func, keys, invalidation_ts"
- " FROM cache_invalidation_stream"
- " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
- )
- txn.execute(sql, (last_id, limit))
- return txn.fetchall()
-
- return self.db.runInteraction(
- "get_all_updated_caches", get_all_updated_caches_txn
- )
-
def get_cache_stream_token(self):
if self._cache_id_gen:
return self._cache_id_gen.get_current_token()
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index e1ccb271..92bc0691 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -21,7 +21,7 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import Database
+from synapse.storage.database import Database, make_tuple_comparison_clause
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache
@@ -303,16 +303,10 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
# we'll just end up updating the same device row multiple
# times, which is fine.
- if self.database_engine.supports_tuple_comparison:
- where_clause = "(user_id, device_id) > (?, ?)"
- where_args = [last_user_id, last_device_id]
- else:
- # We explicitly do a `user_id >= ? AND (...)` here to ensure
- # that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
- # makes it hard for query optimiser to tell that it can use the
- # index on user_id
- where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
- where_args = [last_user_id, last_user_id, last_device_id]
+ where_clause, where_args = make_tuple_comparison_clause(
+ self.database_engine,
+ [("user_id", last_user_id), ("device_id", last_device_id)],
+ )
sql = """
SELECT
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index 0613b49f..9a1178fb 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -207,6 +207,50 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
)
+ def get_all_new_device_messages(self, last_pos, current_pos, limit):
+ """
+ Args:
+ last_pos(int):
+ current_pos(int):
+ limit(int):
+ Returns:
+ A deferred list of rows from the device inbox
+ """
+ if last_pos == current_pos:
+ return defer.succeed([])
+
+ def get_all_new_device_messages_txn(txn):
+ # We limit like this as we might have multiple rows per stream_id, and
+ # we want to make sure we always get all entries for any stream_id
+ # we return.
+ upper_pos = min(current_pos, last_pos + limit)
+ sql = (
+ "SELECT max(stream_id), user_id"
+ " FROM device_inbox"
+ " WHERE ? < stream_id AND stream_id <= ?"
+ " GROUP BY user_id"
+ )
+ txn.execute(sql, (last_pos, upper_pos))
+ rows = txn.fetchall()
+
+ sql = (
+ "SELECT max(stream_id), destination"
+ " FROM device_federation_outbox"
+ " WHERE ? < stream_id AND stream_id <= ?"
+ " GROUP BY destination"
+ )
+ txn.execute(sql, (last_pos, upper_pos))
+ rows.extend(txn)
+
+ # Order by ascending stream ordering
+ rows.sort()
+
+ return rows
+
+ return self.db.runInteraction(
+ "get_all_new_device_messages", get_all_new_device_messages_txn
+ )
+
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
@@ -411,47 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
rows.append((user_id, device_id, stream_id, message_json))
txn.executemany(sql, rows)
-
- def get_all_new_device_messages(self, last_pos, current_pos, limit):
- """
- Args:
- last_pos(int):
- current_pos(int):
- limit(int):
- Returns:
- A deferred list of rows from the device inbox
- """
- if last_pos == current_pos:
- return defer.succeed([])
-
- def get_all_new_device_messages_txn(txn):
- # We limit like this as we might have multiple rows per stream_id, and
- # we want to make sure we always get all entries for any stream_id
- # we return.
- upper_pos = min(current_pos, last_pos + limit)
- sql = (
- "SELECT max(stream_id), user_id"
- " FROM device_inbox"
- " WHERE ? < stream_id AND stream_id <= ?"
- " GROUP BY user_id"
- )
- txn.execute(sql, (last_pos, upper_pos))
- rows = txn.fetchall()
-
- sql = (
- "SELECT max(stream_id), destination"
- " FROM device_federation_outbox"
- " WHERE ? < stream_id AND stream_id <= ?"
- " GROUP BY destination"
- )
- txn.execute(sql, (last_pos, upper_pos))
- rows.extend(txn)
-
- # Order by ascending stream ordering
- rows.sort()
-
- return rows
-
- return self.db.runInteraction(
- "get_all_new_device_messages", get_all_new_device_messages_txn
- )
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 8af5f7de..03f5141e 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import List, Tuple
from six import iteritems
@@ -31,7 +32,11 @@ from synapse.logging.opentracing import (
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
-from synapse.storage.database import Database
+from synapse.storage.database import (
+ Database,
+ LoggingTransaction,
+ make_tuple_comparison_clause,
+)
from synapse.types import Collection, get_verify_key_from_cross_signing_key
from synapse.util.caches.descriptors import (
Cache,
@@ -40,6 +45,7 @@ from synapse.util.caches.descriptors import (
cachedList,
)
from synapse.util.iterutils import batch_iter
+from synapse.util.stringutils import shortstr
logger = logging.getLogger(__name__)
@@ -47,6 +53,8 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
"drop_device_list_streams_non_unique_indexes"
)
+BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
+
class DeviceWorkerStore(SQLBaseStore):
def get_device(self, user_id, device_id):
@@ -112,23 +120,13 @@ class DeviceWorkerStore(SQLBaseStore):
if not has_changed:
return now_stream_id, []
- # We retrieve n+1 devices from the list of outbound pokes where n is
- # our outbound device update limit. We then check if the very last
- # device has the same stream_id as the second-to-last device. If so,
- # then we ignore all devices with that stream_id and only send the
- # devices with a lower stream_id.
- #
- # If when culling the list we end up with no devices afterwards, we
- # consider the device update to be too large, and simply skip the
- # stream_id; the rationale being that such a large device list update
- # is likely an error.
updates = yield self.db.runInteraction(
"get_device_updates_by_remote",
self._get_device_updates_by_remote_txn,
destination,
from_stream_id,
now_stream_id,
- limit + 1,
+ limit,
)
# Return an empty list if there are no updates
@@ -166,14 +164,6 @@ class DeviceWorkerStore(SQLBaseStore):
"device_id": verify_key.version,
}
- # if we have exceeded the limit, we need to exclude any results with the
- # same stream_id as the last row.
- if len(updates) > limit:
- stream_id_cutoff = updates[-1][2]
- now_stream_id = stream_id_cutoff - 1
- else:
- stream_id_cutoff = None
-
# Perform the equivalent of a GROUP BY
#
# Iterate through the updates list and copy non-duplicate
@@ -181,7 +171,6 @@ class DeviceWorkerStore(SQLBaseStore):
# the max stream_id across each set of duplicate entries
#
# maps (user_id, device_id) -> (stream_id, opentracing_context)
- # as long as their stream_id does not match that of the last row
#
# opentracing_context contains the opentracing metadata for the request
# that created the poke
@@ -192,10 +181,6 @@ class DeviceWorkerStore(SQLBaseStore):
query_map = {}
cross_signing_keys_by_user = {}
for user_id, device_id, update_stream_id, update_context in updates:
- if stream_id_cutoff is not None and update_stream_id >= stream_id_cutoff:
- # Stop processing updates
- break
-
if (
user_id in master_key_by_user
and device_id == master_key_by_user[user_id]["device_id"]
@@ -218,17 +203,6 @@ class DeviceWorkerStore(SQLBaseStore):
if update_stream_id > previous_update_stream_id:
query_map[key] = (update_stream_id, update_context)
- # If we didn't find any updates with a stream_id lower than the cutoff, it
- # means that there are more than limit updates all of which have the same
- # steam_id.
-
- # That should only happen if a client is spamming the server with new
- # devices, in which case E2E isn't going to work well anyway. We'll just
- # skip that stream_id and return an empty list, and continue with the next
- # stream_id next time.
- if not query_map and not cross_signing_keys_by_user:
- return stream_id_cutoff, []
-
results = yield self._get_device_update_edus_by_remote(
destination, from_stream_id, query_map
)
@@ -259,11 +233,11 @@ class DeviceWorkerStore(SQLBaseStore):
# get the list of device updates that need to be sent
sql = """
SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
- WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
+ WHERE destination = ? AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
"""
- txn.execute(sql, (destination, from_stream_id, now_stream_id, False, limit))
+ txn.execute(sql, (destination, from_stream_id, now_stream_id, limit))
return list(txn)
@@ -301,7 +275,14 @@ class DeviceWorkerStore(SQLBaseStore):
prev_id = yield self._get_last_device_update_for_remote_user(
destination, user_id, from_stream_id
)
- for device_id, device in iteritems(user_devices):
+
+ # make sure we go through the devices in stream order
+ device_ids = sorted(
+ user_devices.keys(), key=lambda i: query_map[(user_id, i)][0],
+ )
+
+ for device_id in device_ids:
+ device = user_devices[device_id]
stream_id, opentracing_context = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
@@ -560,8 +541,8 @@ class DeviceWorkerStore(SQLBaseStore):
# Get set of users who *may* have changed. Users not in the returned
# list have definitely not changed.
- to_check = list(
- self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
+ to_check = self._device_list_stream_cache.get_entities_changed(
+ user_ids, from_key
)
if not to_check:
@@ -611,22 +592,33 @@ class DeviceWorkerStore(SQLBaseStore):
else:
return set()
- def get_all_device_list_changes_for_remotes(self, from_key, to_key):
- """Return a list of `(stream_id, user_id, destination)` which is the
- combined list of changes to devices, and which destinations need to be
- poked. `destination` may be None if no destinations need to be poked.
+ async def get_all_device_list_changes_for_remotes(
+ self, from_key: int, to_key: int, limit: int,
+ ) -> List[Tuple[int, str]]:
+ """Return a list of `(stream_id, entity)` which is the combined list of
+ changes to devices and which destinations need to be poked. Entity is
+ either a user ID (starting with '@') or a remote destination.
"""
- # We do a group by here as there can be a large number of duplicate
- # entries, since we throw away device IDs.
+
+ # This query Does The Right Thing where it'll correctly apply the
+ # bounds to the inner queries.
sql = """
- SELECT MAX(stream_id) AS stream_id, user_id, destination
- FROM device_lists_stream
- LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
+ SELECT stream_id, entity FROM (
+ SELECT stream_id, user_id AS entity FROM device_lists_stream
+ UNION ALL
+ SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
+ ) AS e
WHERE ? < stream_id AND stream_id <= ?
- GROUP BY user_id, destination
+ LIMIT ?
"""
- return self.db.execute(
- "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key
+
+ return await self.db.execute(
+ "get_all_device_list_changes_for_remotes",
+ None,
+ sql,
+ from_key,
+ to_key,
+ limit,
)
@cached(max_entries=10000)
@@ -728,6 +720,11 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
self._drop_device_list_streams_non_unique_indexes,
)
+ # clear out duplicate device list outbound pokes
+ self.db.updates.register_background_update_handler(
+ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
+ )
+
@defer.inlineCallbacks
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
def f(conn):
@@ -742,6 +739,66 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
)
return 1
+ async def _remove_duplicate_outbound_pokes(self, progress, batch_size):
+ # for some reason, we have accumulated duplicate entries in
+ # device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
+ # efficient.
+ #
+ # For each duplicate, we delete all the existing rows and put one back.
+
+ KEY_COLS = ["stream_id", "destination", "user_id", "device_id"]
+ last_row = progress.get(
+ "last_row",
+ {"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
+ )
+
+ def _txn(txn):
+ clause, args = make_tuple_comparison_clause(
+ self.db.engine, [(x, last_row[x]) for x in KEY_COLS]
+ )
+ sql = """
+ SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
+ FROM device_lists_outbound_pokes
+ WHERE %s
+ GROUP BY %s
+ HAVING count(*) > 1
+ ORDER BY %s
+ LIMIT ?
+ """ % (
+ clause, # WHERE
+ ",".join(KEY_COLS), # GROUP BY
+ ",".join(KEY_COLS), # ORDER BY
+ )
+ txn.execute(sql, args + [batch_size])
+ rows = self.db.cursor_to_dict(txn)
+
+ row = None
+ for row in rows:
+ self.db.simple_delete_txn(
+ txn, "device_lists_outbound_pokes", {x: row[x] for x in KEY_COLS},
+ )
+
+ row["sent"] = False
+ self.db.simple_insert_txn(
+ txn, "device_lists_outbound_pokes", row,
+ )
+
+ if row:
+ self.db.updates._background_update_progress_txn(
+ txn, BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, {"last_row": row},
+ )
+
+ return len(rows)
+
+ rows = await self.db.runInteraction(BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, _txn)
+
+ if not rows:
+ await self.db.updates._end_background_update(
+ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES
+ )
+
+ return rows
+
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
@@ -1021,29 +1078,49 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"""Persist that a user's devices have been updated, and which hosts
(if any) should be poked.
"""
- with self._device_list_id_gen.get_next() as stream_id:
+ if not device_ids:
+ return
+
+ with self._device_list_id_gen.get_next_mult(len(device_ids)) as stream_ids:
+ yield self.db.runInteraction(
+ "add_device_change_to_stream",
+ self._add_device_change_to_stream_txn,
+ user_id,
+ device_ids,
+ stream_ids,
+ )
+
+ if not hosts:
+ return stream_ids[-1]
+
+ context = get_active_span_text_map()
+ with self._device_list_id_gen.get_next_mult(
+ len(hosts) * len(device_ids)
+ ) as stream_ids:
yield self.db.runInteraction(
- "add_device_change_to_streams",
- self._add_device_change_txn,
+ "add_device_outbound_poke_to_stream",
+ self._add_device_outbound_poke_to_stream_txn,
user_id,
device_ids,
hosts,
- stream_id,
+ stream_ids,
+ context,
)
- return stream_id
- def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id):
- now = self._clock.time_msec()
+ return stream_ids[-1]
+ def _add_device_change_to_stream_txn(
+ self,
+ txn: LoggingTransaction,
+ user_id: str,
+ device_ids: Collection[str],
+ stream_ids: List[str],
+ ):
txn.call_after(
- self._device_list_stream_cache.entity_has_changed, user_id, stream_id
+ self._device_list_stream_cache.entity_has_changed, user_id, stream_ids[-1],
)
- for host in hosts:
- txn.call_after(
- self._device_list_federation_stream_cache.entity_has_changed,
- host,
- stream_id,
- )
+
+ min_stream_id = stream_ids[0]
# Delete older entries in the table, as we really only care about
# when the latest change happened.
@@ -1052,7 +1129,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
DELETE FROM device_lists_stream
WHERE user_id = ? AND device_id = ? AND stream_id < ?
""",
- [(user_id, device_id, stream_id) for device_id in device_ids],
+ [(user_id, device_id, min_stream_id) for device_id in device_ids],
)
self.db.simple_insert_many_txn(
@@ -1060,11 +1137,22 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_stream",
values=[
{"stream_id": stream_id, "user_id": user_id, "device_id": device_id}
- for device_id in device_ids
+ for stream_id, device_id in zip(stream_ids, device_ids)
],
)
- context = get_active_span_text_map()
+ def _add_device_outbound_poke_to_stream_txn(
+ self, txn, user_id, device_ids, hosts, stream_ids, context,
+ ):
+ for host in hosts:
+ txn.call_after(
+ self._device_list_federation_stream_cache.entity_has_changed,
+ host,
+ stream_ids[-1],
+ )
+
+ now = self._clock.time_msec()
+ next_stream_id = iter(stream_ids)
self.db.simple_insert_many_txn(
txn,
@@ -1072,7 +1160,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
values=[
{
"destination": destination,
- "stream_id": stream_id,
+ "stream_id": next(next_stream_id),
"user_id": user_id,
"device_id": device_id,
"sent": False,
@@ -1086,18 +1174,47 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
],
)
- def _prune_old_outbound_device_pokes(self):
+ def _prune_old_outbound_device_pokes(self, prune_age=24 * 60 * 60 * 1000):
"""Delete old entries out of the device_lists_outbound_pokes to ensure
- that we don't fill up due to dead servers. We keep one entry per
- (destination, user_id) tuple to ensure that the prev_ids remain correct
- if the server does come back.
+ that we don't fill up due to dead servers.
+
+ Normally, we try to send device updates as a delta since a previous known point:
+ this is done by setting the prev_id in the m.device_list_update EDU. However,
+ for that to work, we have to have a complete record of each change to
+ each device, which can add up to quite a lot of data.
+
+ An alternative mechanism is that, if the remote server sees that it has missed
+ an entry in the stream_id sequence for a given user, it will request a full
+ list of that user's devices. Hence, we can reduce the amount of data we have to
+ store (and transmit in some future transaction), by clearing almost everything
+ for a given destination out of the database, and having the remote server
+ resync.
+
+ All we need to do is make sure we keep at least one row for each
+ (user, destination) pair, to remind us to send a m.device_list_update EDU for
+ that user when the destination comes back. It doesn't matter which device
+ we keep.
"""
- yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000
+ yesterday = self._clock.time_msec() - prune_age
def _prune_txn(txn):
+ # look for (user, destination) pairs which have an update older than
+ # the cutoff.
+ #
+ # For each pair, we also need to know the most recent stream_id, and
+ # an arbitrary device_id at that stream_id.
select_sql = """
- SELECT destination, user_id, max(stream_id) as stream_id
- FROM device_lists_outbound_pokes
+ SELECT
+ dlop1.destination,
+ dlop1.user_id,
+ MAX(dlop1.stream_id) AS stream_id,
+ (SELECT MIN(dlop2.device_id) AS device_id FROM
+ device_lists_outbound_pokes dlop2
+ WHERE dlop2.destination = dlop1.destination AND
+ dlop2.user_id=dlop1.user_id AND
+ dlop2.stream_id=MAX(dlop1.stream_id)
+ )
+ FROM device_lists_outbound_pokes dlop1
GROUP BY destination, user_id
HAVING min(ts) < ? AND count(*) > 1
"""
@@ -1108,14 +1225,29 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
if not rows:
return
+ logger.info(
+ "Pruning old outbound device list updates for %i users/destinations: %s",
+ len(rows),
+ shortstr((row[0], row[1]) for row in rows),
+ )
+
+ # we want to keep the update with the highest stream_id for each user.
+ #
+ # there might be more than one update (with different device_ids) with the
+ # same stream_id, so we also delete all but one rows with the max stream id.
delete_sql = """
DELETE FROM device_lists_outbound_pokes
- WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
+ WHERE destination = ? AND user_id = ? AND (
+ stream_id < ? OR
+ (stream_id = ? AND device_id != ?)
+ )
"""
-
- txn.executemany(
- delete_sql, ((yesterday, row[0], row[1], row[2]) for row in rows)
- )
+ count = 0
+ for (destination, user_id, stream_id, device_id) in rows:
+ txn.execute(
+ delete_sql, (destination, user_id, stream_id, stream_id, device_id)
+ )
+ count += txn.rowcount
# Since we've deleted unsent deltas, we need to remove the entry
# of last successful sent so that the prev_ids are correctly set.
@@ -1125,7 +1257,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
"""
txn.executemany(sql, ((row[0], row[1]) for row in rows))
- logger.info("Pruned %d device list outbound pokes", txn.rowcount)
+ logger.info("Pruned %d device list outbound pokes", count)
return run_as_background_process(
"prune_old_outbound_device_pokes",
diff --git a/synapse/storage/data_stores/main/directory.py b/synapse/storage/data_stores/main/directory.py
index c9e7de7d..e1d1bc3e 100644
--- a/synapse/storage/data_stores/main/directory.py
+++ b/synapse/storage/data_stores/main/directory.py
@@ -14,6 +14,7 @@
# limitations under the License.
from collections import namedtuple
+from typing import Optional
from twisted.internet import defer
@@ -159,10 +160,29 @@ class DirectoryStore(DirectoryWorkerStore):
return room_id
- def update_aliases_for_room(self, old_room_id, new_room_id, creator):
+ def update_aliases_for_room(
+ self, old_room_id: str, new_room_id: str, creator: Optional[str] = None,
+ ):
+ """Repoint all of the aliases for a given room, to a different room.
+
+ Args:
+ old_room_id:
+ new_room_id:
+ creator: The user to record as the creator of the new mapping.
+ If None, the creator will be left unchanged.
+ """
+
def _update_aliases_for_room_txn(txn):
- sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?"
- txn.execute(sql, (new_room_id, creator, old_room_id))
+ update_creator_sql = ""
+ sql_params = (new_room_id, old_room_id)
+ if creator:
+ update_creator_sql = ", creator = ?"
+ sql_params = (new_room_id, creator, old_room_id)
+
+ sql = "UPDATE room_aliases SET room_id = ? %s WHERE room_id = ?" % (
+ update_creator_sql,
+ )
+ txn.execute(sql, sql_params)
self._invalidate_cache_and_stream(
txn, self.get_aliases_for_room, (old_room_id,)
)
diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py
index 84594cf0..23f4570c 100644
--- a/synapse/storage/data_stores/main/e2e_room_keys.py
+++ b/synapse/storage/data_stores/main/e2e_room_keys.py
@@ -146,7 +146,8 @@ class EndToEndRoomKeyStore(SQLBaseStore):
room_entry["sessions"][row["session_id"]] = {
"first_message_index": row["first_message_index"],
"forwarded_count": row["forwarded_count"],
- "is_verified": row["is_verified"],
+ # is_verified must be returned to the client as a boolean
+ "is_verified": bool(row["is_verified"]),
"session_data": json.loads(row["session_data"]),
}
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 001a53f9..bcf746b7 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -537,7 +537,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return result
- def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
+ def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit):
"""Return a list of changes from the user signature stream to notify remotes.
Note that the user signature stream represents when a user signs their
device with their user-signing key, which is not published to other
@@ -552,13 +552,19 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
"""
sql = """
- SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id
+ SELECT stream_id, from_user_id AS user_id
FROM user_signature_stream
WHERE ? < stream_id AND stream_id <= ?
- GROUP BY user_id
+ ORDER BY stream_id ASC
+ LIMIT ?
"""
return self.db.execute(
- "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key
+ "get_all_user_signature_changes_for_remotes",
+ None,
+ sql,
+ from_key,
+ to_key,
+ limit,
)
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 62d4e9f5..b99439cc 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -173,19 +173,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
for event_id in initial_events
}
+ # The sorted list of events whose auth chains we should walk.
+ search = [] # type: List[Tuple[int, str]]
+
# We need to get the depth of the initial events for sorting purposes.
sql = """
SELECT depth, event_id FROM events
WHERE %s
- ORDER BY depth ASC
"""
- clause, args = make_in_list_sql_clause(
- txn.database_engine, "event_id", initial_events
- )
- txn.execute(sql % (clause,), args)
+ # the list can be huge, so let's avoid looking them all up in one massive
+ # query.
+ for batch in batch_iter(initial_events, 1000):
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "event_id", batch
+ )
+ txn.execute(sql % (clause,), args)
- # The sorted list of events whose auth chains we should walk.
- search = txn.fetchall() # type: List[Tuple[int, str]]
+ # I think building a temporary list with fetchall is more efficient than
+ # just `search.extend(txn)`, but this is unconfirmed
+ search.extend(txn.fetchall())
+
+ # sort by depth
+ search.sort()
# Map from event to its auth events
event_to_auth_events = {} # type: Dict[str, Set[str]]
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index d593ef47..e71c2354 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -1267,104 +1267,6 @@ class EventsStore(
ret = yield self.db.runInteraction("count_daily_active_rooms", _count)
return ret
- def get_current_backfill_token(self):
- """The current minimum token that backfilled events have reached"""
- return -self._backfill_id_gen.get_current_token()
-
- def get_current_events_token(self):
- """The current maximum token that events have reached"""
- return self._stream_id_gen.get_current_token()
-
- def get_all_new_forward_event_rows(self, last_id, current_id, limit):
- if last_id == current_id:
- return defer.succeed([])
-
- def get_all_new_forward_event_rows(txn):
- sql = (
- "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts, relates_to_id"
- " FROM events AS e"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events USING (event_id)"
- " LEFT JOIN event_relations USING (event_id)"
- " WHERE ? < stream_ordering AND stream_ordering <= ?"
- " ORDER BY stream_ordering ASC"
- " LIMIT ?"
- )
- txn.execute(sql, (last_id, current_id, limit))
- new_event_updates = txn.fetchall()
-
- if len(new_event_updates) == limit:
- upper_bound = new_event_updates[-1][0]
- else:
- upper_bound = current_id
-
- sql = (
- "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts, relates_to_id"
- " FROM events AS e"
- " INNER JOIN ex_outlier_stream USING (event_id)"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events USING (event_id)"
- " LEFT JOIN event_relations USING (event_id)"
- " WHERE ? < event_stream_ordering"
- " AND event_stream_ordering <= ?"
- " ORDER BY event_stream_ordering DESC"
- )
- txn.execute(sql, (last_id, upper_bound))
- new_event_updates.extend(txn)
-
- return new_event_updates
-
- return self.db.runInteraction(
- "get_all_new_forward_event_rows", get_all_new_forward_event_rows
- )
-
- def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
- if last_id == current_id:
- return defer.succeed([])
-
- def get_all_new_backfill_event_rows(txn):
- sql = (
- "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts, relates_to_id"
- " FROM events AS e"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events USING (event_id)"
- " LEFT JOIN event_relations USING (event_id)"
- " WHERE ? > stream_ordering AND stream_ordering >= ?"
- " ORDER BY stream_ordering ASC"
- " LIMIT ?"
- )
- txn.execute(sql, (-last_id, -current_id, limit))
- new_event_updates = txn.fetchall()
-
- if len(new_event_updates) == limit:
- upper_bound = new_event_updates[-1][0]
- else:
- upper_bound = current_id
-
- sql = (
- "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
- " state_key, redacts, relates_to_id"
- " FROM events AS e"
- " INNER JOIN ex_outlier_stream USING (event_id)"
- " LEFT JOIN redactions USING (event_id)"
- " LEFT JOIN state_events USING (event_id)"
- " LEFT JOIN event_relations USING (event_id)"
- " WHERE ? > event_stream_ordering"
- " AND event_stream_ordering >= ?"
- " ORDER BY event_stream_ordering DESC"
- )
- txn.execute(sql, (-last_id, -upper_bound))
- new_event_updates.extend(txn.fetchall())
-
- return new_event_updates
-
- return self.db.runInteraction(
- "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
- )
-
@cached(num_args=5, max_entries=10)
def get_all_new_events(
self,
@@ -1850,22 +1752,6 @@ class EventsStore(
return (int(res["topological_ordering"]), int(res["stream_ordering"]))
- def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
- def get_all_updated_current_state_deltas_txn(txn):
- sql = """
- SELECT stream_id, room_id, type, state_key, event_id
- FROM current_state_delta_stream
- WHERE ? < stream_id AND stream_id <= ?
- ORDER BY stream_id ASC LIMIT ?
- """
- txn.execute(sql, (from_token, to_token, limit))
- return txn.fetchall()
-
- return self.db.runInteraction(
- "get_all_updated_current_state_deltas",
- get_all_updated_current_state_deltas_txn,
- )
-
def insert_labels_for_event_txn(
self, txn, event_id, labels, room_id, topological_ordering
):
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index ca237c6f..73df6b33 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -19,7 +19,7 @@ import itertools
import logging
import threading
from collections import namedtuple
-from typing import List, Optional
+from typing import List, Optional, Tuple
from canonicaljson import json
from constantly import NamedConstant, Names
@@ -35,7 +35,7 @@ from synapse.api.room_versions import (
)
from synapse.events import make_event_from_dict
from synapse.events.utils import prune_event
-from synapse.logging.context import LoggingContext, PreserveLoggingContext
+from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import Database
@@ -409,7 +409,7 @@ class EventsWorkerStore(SQLBaseStore):
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
if missing_events_ids:
- log_ctx = LoggingContext.current_context()
+ log_ctx = current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
# Note that _get_events_from_db is also responsible for turning db rows
@@ -632,7 +632,7 @@ class EventsWorkerStore(SQLBaseStore):
event_map[event_id] = original_ev
- # finally, we can decide whether each one nededs redacting, and build
+ # finally, we can decide whether each one needs redacting, and build
# the cache entries.
result_map = {}
for event_id, original_ev in event_map.items():
@@ -963,3 +963,195 @@ class EventsWorkerStore(SQLBaseStore):
complexity_v1 = round(state_events / 500, 2)
return {"v1": complexity_v1}
+
+ def get_current_backfill_token(self):
+ """The current minimum token that backfilled events have reached"""
+ return -self._backfill_id_gen.get_current_token()
+
+ def get_current_events_token(self):
+ """The current maximum token that events have reached"""
+ return self._stream_id_gen.get_current_token()
+
+ def get_all_new_forward_event_rows(self, last_id, current_id, limit):
+ """Returns new events, for the Events replication stream
+
+ Args:
+ last_id: the last stream_id from the previous batch.
+ current_id: the maximum stream_id to return up to
+ limit: the maximum number of rows to return
+
+ Returns: Deferred[List[Tuple]]
+ a list of events stream rows. Each tuple consists of a stream id as
+ the first element, followed by fields suitable for casting into an
+ EventsStreamRow.
+ """
+
+ def get_all_new_forward_event_rows(txn):
+ sql = (
+ "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
+ " state_key, redacts, relates_to_id"
+ " FROM events AS e"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " WHERE ? < stream_ordering AND stream_ordering <= ?"
+ " ORDER BY stream_ordering ASC"
+ " LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+ return txn.fetchall()
+
+ return self.db.runInteraction(
+ "get_all_new_forward_event_rows", get_all_new_forward_event_rows
+ )
+
+ def get_ex_outlier_stream_rows(self, last_id, current_id):
+ """Returns de-outliered events, for the Events replication stream
+
+ Args:
+ last_id: the last stream_id from the previous batch.
+ current_id: the maximum stream_id to return up to
+
+ Returns: Deferred[List[Tuple]]
+ a list of events stream rows. Each tuple consists of a stream id as
+ the first element, followed by fields suitable for casting into an
+ EventsStreamRow.
+ """
+
+ def get_ex_outlier_stream_rows_txn(txn):
+ sql = (
+ "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
+ " state_key, redacts, relates_to_id"
+ " FROM events AS e"
+ " INNER JOIN ex_outlier_stream USING (event_id)"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " WHERE ? < event_stream_ordering"
+ " AND event_stream_ordering <= ?"
+ " ORDER BY event_stream_ordering ASC"
+ )
+
+ txn.execute(sql, (last_id, current_id))
+ return txn.fetchall()
+
+ return self.db.runInteraction(
+ "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
+ )
+
+ def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed([])
+
+ def get_all_new_backfill_event_rows(txn):
+ sql = (
+ "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
+ " state_key, redacts, relates_to_id"
+ " FROM events AS e"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " WHERE ? > stream_ordering AND stream_ordering >= ?"
+ " ORDER BY stream_ordering ASC"
+ " LIMIT ?"
+ )
+ txn.execute(sql, (-last_id, -current_id, limit))
+ new_event_updates = txn.fetchall()
+
+ if len(new_event_updates) == limit:
+ upper_bound = new_event_updates[-1][0]
+ else:
+ upper_bound = current_id
+
+ sql = (
+ "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
+ " state_key, redacts, relates_to_id"
+ " FROM events AS e"
+ " INNER JOIN ex_outlier_stream USING (event_id)"
+ " LEFT JOIN redactions USING (event_id)"
+ " LEFT JOIN state_events USING (event_id)"
+ " LEFT JOIN event_relations USING (event_id)"
+ " WHERE ? > event_stream_ordering"
+ " AND event_stream_ordering >= ?"
+ " ORDER BY event_stream_ordering DESC"
+ )
+ txn.execute(sql, (-last_id, -upper_bound))
+ new_event_updates.extend(txn.fetchall())
+
+ return new_event_updates
+
+ return self.db.runInteraction(
+ "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
+ )
+
+ async def get_all_updated_current_state_deltas(
+ self, from_token: int, to_token: int, target_row_count: int
+ ) -> Tuple[List[Tuple], int, bool]:
+ """Fetch updates from current_state_delta_stream
+
+ Args:
+ from_token: The previous stream token. Updates from this stream id will
+ be excluded.
+
+ to_token: The current stream token (ie the upper limit). Updates up to this
+ stream id will be included (modulo the 'limit' param)
+
+ target_row_count: The number of rows to try to return. If more rows are
+ available, we will set 'limited' in the result. In the event of a large
+ batch, we may return more rows than this.
+ Returns:
+ A triplet `(updates, new_last_token, limited)`, where:
+ * `updates` is a list of database tuples.
+ * `new_last_token` is the new position in stream.
+ * `limited` is whether there are more updates to fetch.
+ """
+
+ def get_all_updated_current_state_deltas_txn(txn):
+ sql = """
+ SELECT stream_id, room_id, type, state_key, event_id
+ FROM current_state_delta_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC LIMIT ?
+ """
+ txn.execute(sql, (from_token, to_token, target_row_count))
+ return txn.fetchall()
+
+ def get_deltas_for_stream_id_txn(txn, stream_id):
+ sql = """
+ SELECT stream_id, room_id, type, state_key, event_id
+ FROM current_state_delta_stream
+ WHERE stream_id = ?
+ """
+ txn.execute(sql, [stream_id])
+ return txn.fetchall()
+
+ # we need to make sure that, for every stream id in the results, we get *all*
+ # the rows with that stream id.
+
+ rows = await self.db.runInteraction(
+ "get_all_updated_current_state_deltas",
+ get_all_updated_current_state_deltas_txn,
+ ) # type: List[Tuple]
+
+ # if we've got fewer rows than the limit, we're good
+ if len(rows) < target_row_count:
+ return rows, to_token, False
+
+ # we hit the limit, so reduce the upper limit so that we exclude the stream id
+ # of the last row in the result.
+ assert rows[-1][0] <= to_token
+ to_token = rows[-1][0] - 1
+
+ # search backwards through the list for the point to truncate
+ for idx in range(len(rows) - 1, 0, -1):
+ if rows[idx - 1][0] <= to_token:
+ return rows[:idx], to_token, True
+
+ # bother. We didn't get a full set of changes for even a single
+ # stream id. let's run the query again, without a row limit, but for
+ # just one stream id.
+ to_token += 1
+ rows = await self.db.runInteraction(
+ "get_deltas_for_stream_id", get_deltas_for_stream_id_txn, to_token
+ )
+ return rows, to_token, True
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 80ca36de..8aecd414 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -340,7 +340,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"get_expired_url_cache", _get_expired_url_cache_txn
)
- def delete_url_cache(self, media_ids):
+ async def delete_url_cache(self, media_ids):
if len(media_ids) == 0:
return
@@ -349,7 +349,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
- return self.db.runInteraction("delete_url_cache", _delete_url_cache_txn)
+ return await self.db.runInteraction("delete_url_cache", _delete_url_cache_txn)
def get_url_cache_media_before(self, before_ts):
sql = (
@@ -367,7 +367,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"get_url_cache_media_before", _get_url_cache_media_before_txn
)
- def delete_url_cache_media(self, media_ids):
+ async def delete_url_cache_media(self, media_ids):
if len(media_ids) == 0:
return
@@ -380,6 +380,6 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
- return self.db.runInteraction(
+ return await self.db.runInteraction(
"delete_url_cache_media", _delete_url_cache_media_txn
)
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index 604c8b7d..dab31e0c 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -60,7 +60,7 @@ class PresenceStore(SQLBaseStore):
"status_msg": state.status_msg,
"currently_active": state.currently_active,
}
- for state in presence_states
+ for stream_id, state in zip(stream_orderings, presence_states)
],
)
@@ -73,19 +73,22 @@ class PresenceStore(SQLBaseStore):
)
txn.execute(sql + clause, [stream_id] + list(args))
- def get_all_presence_updates(self, last_id, current_id):
+ def get_all_presence_updates(self, last_id, current_id, limit):
if last_id == current_id:
return defer.succeed([])
def get_all_presence_updates_txn(txn):
- sql = (
- "SELECT stream_id, user_id, state, last_active_ts,"
- " last_federation_update_ts, last_user_sync_ts, status_msg,"
- " currently_active"
- " FROM presence_stream"
- " WHERE ? < stream_id AND stream_id <= ?"
- )
- txn.execute(sql, (last_id, current_id))
+ sql = """
+ SELECT stream_id, user_id, state, last_active_ts,
+ last_federation_update_ts, last_user_sync_ts,
+ status_msg,
+ currently_active
+ FROM presence_stream
+ WHERE ? < stream_id AND stream_id <= ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_id, current_id, limit))
return txn.fetchall()
return self.db.runInteraction(
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index 62ac88d9..b3faafa0 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -41,6 +41,7 @@ def _load_rules(rawrules, enabled_map):
rule = dict(rawrule)
rule["conditions"] = json.loads(rawrule["conditions"])
rule["actions"] = json.loads(rawrule["actions"])
+ rule["default"] = False
ruleslist.append(rule)
# We're going to be mutating this a lot, so do a deep copy
@@ -333,6 +334,26 @@ class PushRulesWorkerStore(
results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
return results
+ def get_all_push_rule_updates(self, last_id, current_id, limit):
+ """Get all the push rules changes that have happend on the server"""
+ if last_id == current_id:
+ return defer.succeed([])
+
+ def get_all_push_rule_updates_txn(txn):
+ sql = (
+ "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
+ " op, priority_class, priority, conditions, actions"
+ " FROM push_rules_stream"
+ " WHERE ? < stream_id AND stream_id <= ?"
+ " ORDER BY stream_id ASC LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+ return txn.fetchall()
+
+ return self.db.runInteraction(
+ "get_all_push_rule_updates", get_all_push_rule_updates_txn
+ )
+
class PushRuleStore(PushRulesWorkerStore):
@defer.inlineCallbacks
@@ -684,26 +705,6 @@ class PushRuleStore(PushRulesWorkerStore):
self.push_rules_stream_cache.entity_has_changed, user_id, stream_id
)
- def get_all_push_rule_updates(self, last_id, current_id, limit):
- """Get all the push rules changes that have happend on the server"""
- if last_id == current_id:
- return defer.succeed([])
-
- def get_all_push_rule_updates_txn(txn):
- sql = (
- "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
- " op, priority_class, priority, conditions, actions"
- " FROM push_rules_stream"
- " WHERE ? < stream_id AND stream_id <= ?"
- " ORDER BY stream_id ASC LIMIT ?"
- )
- txn.execute(sql, (last_id, current_id, limit))
- return txn.fetchall()
-
- return self.db.runInteraction(
- "get_all_push_rule_updates", get_all_push_rule_updates_txn
- )
-
def get_push_rules_stream_token(self):
"""Get the position of the push rules stream.
Returns a pair of a stream id for the push_rules stream and the
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 3e53c856..efcdd210 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -273,8 +273,7 @@ class RegistrationWorkerStore(SQLBaseStore):
desc="delete_account_validity_for_user",
)
- @defer.inlineCallbacks
- def is_server_admin(self, user):
+ async def is_server_admin(self, user):
"""Determines if a user is an admin of this homeserver.
Args:
@@ -283,7 +282,7 @@ class RegistrationWorkerStore(SQLBaseStore):
Returns (bool):
true iff the user is a server admin, false otherwise.
"""
- res = yield self.db.simple_select_one_onecol(
+ res = await self.db.simple_select_one_onecol(
table="users",
keyvalues={"name": user.to_string()},
retcol="admin",
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index e6c10c63..147eba1d 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -52,12 +52,28 @@ class RoomSortOrder(Enum):
"""
Enum to define the sorting method used when returning rooms with get_rooms_paginate
- ALPHABETICAL = sort rooms alphabetically by name
- SIZE = sort rooms by membership size, highest to lowest
+ NAME = sort rooms alphabetically by name
+ JOINED_MEMBERS = sort rooms by membership size, highest to lowest
"""
+ # ALPHABETICAL and SIZE are deprecated.
+ # ALPHABETICAL is the same as NAME.
ALPHABETICAL = "alphabetical"
+ # SIZE is the same as JOINED_MEMBERS.
SIZE = "size"
+ NAME = "name"
+ CANONICAL_ALIAS = "canonical_alias"
+ JOINED_MEMBERS = "joined_members"
+ JOINED_LOCAL_MEMBERS = "joined_local_members"
+ VERSION = "version"
+ CREATOR = "creator"
+ ENCRYPTION = "encryption"
+ FEDERATABLE = "federatable"
+ PUBLIC = "public"
+ JOIN_RULES = "join_rules"
+ GUEST_ACCESS = "guest_access"
+ HISTORY_VISIBILITY = "history_visibility"
+ STATE_EVENTS = "state_events"
class RoomWorkerStore(SQLBaseStore):
@@ -329,12 +345,52 @@ class RoomWorkerStore(SQLBaseStore):
# Set ordering
if RoomSortOrder(order_by) == RoomSortOrder.SIZE:
+ # Deprecated in favour of RoomSortOrder.JOINED_MEMBERS
order_by_column = "curr.joined_members"
order_by_asc = False
elif RoomSortOrder(order_by) == RoomSortOrder.ALPHABETICAL:
- # Sort alphabetically
+ # Deprecated in favour of RoomSortOrder.NAME
order_by_column = "state.name"
order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.NAME:
+ order_by_column = "state.name"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.CANONICAL_ALIAS:
+ order_by_column = "state.canonical_alias"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.JOINED_MEMBERS:
+ order_by_column = "curr.joined_members"
+ order_by_asc = False
+ elif RoomSortOrder(order_by) == RoomSortOrder.JOINED_LOCAL_MEMBERS:
+ order_by_column = "curr.local_users_in_room"
+ order_by_asc = False
+ elif RoomSortOrder(order_by) == RoomSortOrder.VERSION:
+ order_by_column = "rooms.room_version"
+ order_by_asc = False
+ elif RoomSortOrder(order_by) == RoomSortOrder.CREATOR:
+ order_by_column = "rooms.creator"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.ENCRYPTION:
+ order_by_column = "state.encryption"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.FEDERATABLE:
+ order_by_column = "state.is_federatable"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.PUBLIC:
+ order_by_column = "rooms.is_public"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.JOIN_RULES:
+ order_by_column = "state.join_rules"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.GUEST_ACCESS:
+ order_by_column = "state.guest_access"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.HISTORY_VISIBILITY:
+ order_by_column = "state.history_visibility"
+ order_by_asc = True
+ elif RoomSortOrder(order_by) == RoomSortOrder.STATE_EVENTS:
+ order_by_column = "curr.current_state_events"
+ order_by_asc = False
else:
raise StoreError(
500, "Incorrect value for order_by provided: %s" % order_by
@@ -349,9 +405,13 @@ class RoomWorkerStore(SQLBaseStore):
# for, and another query for getting the total number of events that could be
# returned. Thus allowing us to see if there are more events to paginate through
info_sql = """
- SELECT state.room_id, state.name, state.canonical_alias, curr.joined_members
+ SELECT state.room_id, state.name, state.canonical_alias, curr.joined_members,
+ curr.local_users_in_room, rooms.room_version, rooms.creator,
+ state.encryption, state.is_federatable, rooms.is_public, state.join_rules,
+ state.guest_access, state.history_visibility, curr.current_state_events
FROM room_stats_state state
INNER JOIN room_stats_current curr USING (room_id)
+ INNER JOIN rooms USING (room_id)
%s
ORDER BY %s %s
LIMIT ?
@@ -389,6 +449,16 @@ class RoomWorkerStore(SQLBaseStore):
"name": room[1],
"canonical_alias": room[2],
"joined_members": room[3],
+ "joined_local_members": room[4],
+ "version": room[5],
+ "creator": room[6],
+ "encryption": room[7],
+ "federatable": room[8],
+ "public": room[9],
+ "join_rules": room[10],
+ "guest_access": room[11],
+ "history_visibility": room[12],
+ "state_events": room[13],
}
)
@@ -732,6 +802,26 @@ class RoomWorkerStore(SQLBaseStore):
return total_media_quarantined
+ def get_all_new_public_rooms(self, prev_id, current_id, limit):
+ def get_all_new_public_rooms(txn):
+ sql = """
+ SELECT stream_id, room_id, visibility, appservice_id, network_id
+ FROM public_room_list_stream
+ WHERE stream_id > ? AND stream_id <= ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """
+
+ txn.execute(sql, (prev_id, current_id, limit))
+ return txn.fetchall()
+
+ if prev_id == current_id:
+ return defer.succeed([])
+
+ return self.db.runInteraction(
+ "get_all_new_public_rooms", get_all_new_public_rooms
+ )
+
class RoomBackgroundUpdateStore(SQLBaseStore):
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -1249,26 +1339,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()
- def get_all_new_public_rooms(self, prev_id, current_id, limit):
- def get_all_new_public_rooms(txn):
- sql = """
- SELECT stream_id, room_id, visibility, appservice_id, network_id
- FROM public_room_list_stream
- WHERE stream_id > ? AND stream_id <= ?
- ORDER BY stream_id ASC
- LIMIT ?
- """
-
- txn.execute(sql, (prev_id, current_id, limit))
- return txn.fetchall()
-
- if prev_id == current_id:
- return defer.succeed([])
-
- return self.db.runInteraction(
- "get_all_new_public_rooms", get_all_new_public_rooms
- )
-
@defer.inlineCallbacks
def block_room(self, room_id, user_id):
"""Marks the room as blocked. Can be called multiple times.
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index d5bd0cb5..e626b7f6 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -576,7 +576,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
if key[0] == EventTypes.Member
]
for etype, state_key in context.delta_ids:
- users_in_room.pop(state_key, None)
+ if etype == EventTypes.Member:
+ users_in_room.pop(state_key, None)
# We check if we have any of the member event ids in the event cache
# before we ask the DB
diff --git a/synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql b/synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql
index 163529c0..bbdde121 100644
--- a/synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql
+++ b/synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql
@@ -35,9 +35,13 @@ DELETE FROM background_updates WHERE update_name IN (
'populate_stats_cleanup'
);
+-- this relies on current_state_events.membership having been populated, so add
+-- a dependency on current_state_events_membership.
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
- ('populate_stats_process_rooms', '{}', '');
+ ('populate_stats_process_rooms', '{}', 'current_state_events_membership');
+-- this also relies on current_state_events.membership having been populated, but
+-- we get that as a side-effect of depending on populate_stats_process_rooms.
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
diff --git a/synapse/storage/data_stores/main/schema/delta/57/remove_sent_outbound_pokes.sql b/synapse/storage/data_stores/main/schema/delta/57/remove_sent_outbound_pokes.sql
new file mode 100644
index 00000000..133d80af
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/remove_sent_outbound_pokes.sql
@@ -0,0 +1,21 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- we no longer keep sent outbound device pokes in the db; clear them out
+-- so that we don't have to worry about them.
+--
+-- This is a sequence scan, but it doesn't take too long.
+
+DELETE FROM device_lists_outbound_pokes WHERE sent;
diff --git a/synapse/storage/data_stores/main/schema/delta/58/02remove_dup_outbound_pokes.sql b/synapse/storage/data_stores/main/schema/delta/58/02remove_dup_outbound_pokes.sql
new file mode 100644
index 00000000..fdc39e9b
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/02remove_dup_outbound_pokes.sql
@@ -0,0 +1,22 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /* for some reason, we have accumulated duplicate entries in
+ * device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
+ * efficient.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json)
+ VALUES (5800, 'remove_dup_outbound_pokes', '{}');
diff --git a/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql b/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql
new file mode 100644
index 00000000..dcb593fc
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql
@@ -0,0 +1,36 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS ui_auth_sessions(
+ session_id TEXT NOT NULL, -- The session ID passed to the client.
+ creation_time BIGINT NOT NULL, -- The time this session was created (epoch time in milliseconds).
+ serverdict TEXT NOT NULL, -- A JSON dictionary of arbitrary data added by Synapse.
+ clientdict TEXT NOT NULL, -- A JSON dictionary of arbitrary data from the client.
+ uri TEXT NOT NULL, -- The URI the UI authentication session is using.
+ method TEXT NOT NULL, -- The HTTP method the UI authentication session is using.
+ -- The clientdict, uri, and method make up an tuple that must be immutable
+ -- throughout the lifetime of the UI Auth session.
+ description TEXT NOT NULL, -- A human readable description of the operation which caused the UI Auth flow to occur.
+ UNIQUE (session_id)
+);
+
+CREATE TABLE IF NOT EXISTS ui_auth_sessions_credentials(
+ session_id TEXT NOT NULL, -- The corresponding UI Auth session.
+ stage_type TEXT NOT NULL, -- The stage type.
+ result TEXT NOT NULL, -- The result of the stage verification, stored as JSON.
+ UNIQUE (session_id, stage_type),
+ FOREIGN KEY (session_id)
+ REFERENCES ui_auth_sessions (session_id)
+);
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index ada5cce6..e89f0bff 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -481,11 +481,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_id, limit, end_token
)
- logger.debug("stream before")
events = yield self.get_events_as_list(
[r.event_id for r in rows], get_prev_content=True
)
- logger.debug("stream after")
self._set_before_and_after(events, rows)
diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py
new file mode 100644
index 00000000..1d8ee22f
--- /dev/null
+++ b/synapse/storage/data_stores/main/ui_auth.py
@@ -0,0 +1,300 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from typing import Any, Dict, Optional, Union
+
+import attr
+
+import synapse.util.stringutils as stringutils
+from synapse.api.errors import StoreError
+from synapse.storage._base import SQLBaseStore
+from synapse.types import JsonDict
+
+
+@attr.s
+class UIAuthSessionData:
+ session_id = attr.ib(type=str)
+ # The dictionary from the client root level, not the 'auth' key.
+ clientdict = attr.ib(type=JsonDict)
+ # The URI and method the session was intiatied with. These are checked at
+ # each stage of the authentication to ensure that the asked for operation
+ # has not changed.
+ uri = attr.ib(type=str)
+ method = attr.ib(type=str)
+ # A string description of the operation that the current authentication is
+ # authorising.
+ description = attr.ib(type=str)
+
+
+class UIAuthWorkerStore(SQLBaseStore):
+ """
+ Manage user interactive authentication sessions.
+ """
+
+ async def create_ui_auth_session(
+ self, clientdict: JsonDict, uri: str, method: str, description: str,
+ ) -> UIAuthSessionData:
+ """
+ Creates a new user interactive authentication session.
+
+ The session can be used to track the stages necessary to authenticate a
+ user across multiple HTTP requests.
+
+ Args:
+ clientdict:
+ The dictionary from the client root level, not the 'auth' key.
+ uri:
+ The URI this session was initiated with, this is checked at each
+ stage of the authentication to ensure that the asked for
+ operation has not changed.
+ method:
+ The method this session was initiated with, this is checked at each
+ stage of the authentication to ensure that the asked for
+ operation has not changed.
+ description:
+ A string description of the operation that the current
+ authentication is authorising.
+ Returns:
+ The newly created session.
+ Raises:
+ StoreError if a unique session ID cannot be generated.
+ """
+ # The clientdict gets stored as JSON.
+ clientdict_json = json.dumps(clientdict)
+
+ # autogen a session ID and try to create it. We may clash, so just
+ # try a few times till one goes through, giving up eventually.
+ attempts = 0
+ while attempts < 5:
+ session_id = stringutils.random_string(24)
+
+ try:
+ await self.db.simple_insert(
+ table="ui_auth_sessions",
+ values={
+ "session_id": session_id,
+ "clientdict": clientdict_json,
+ "uri": uri,
+ "method": method,
+ "description": description,
+ "serverdict": "{}",
+ "creation_time": self.hs.get_clock().time_msec(),
+ },
+ desc="create_ui_auth_session",
+ )
+ return UIAuthSessionData(
+ session_id, clientdict, uri, method, description
+ )
+ except self.db.engine.module.IntegrityError:
+ attempts += 1
+ raise StoreError(500, "Couldn't generate a session ID.")
+
+ async def get_ui_auth_session(self, session_id: str) -> UIAuthSessionData:
+ """Retrieve a UI auth session.
+
+ Args:
+ session_id: The ID of the session.
+ Returns:
+ A dict containing the device information.
+ Raises:
+ StoreError if the session is not found.
+ """
+ result = await self.db.simple_select_one(
+ table="ui_auth_sessions",
+ keyvalues={"session_id": session_id},
+ retcols=("clientdict", "uri", "method", "description"),
+ desc="get_ui_auth_session",
+ )
+
+ result["clientdict"] = json.loads(result["clientdict"])
+
+ return UIAuthSessionData(session_id, **result)
+
+ async def mark_ui_auth_stage_complete(
+ self, session_id: str, stage_type: str, result: Union[str, bool, JsonDict],
+ ):
+ """
+ Mark a session stage as completed.
+
+ Args:
+ session_id: The ID of the corresponding session.
+ stage_type: The completed stage type.
+ result: The result of the stage verification.
+ Raises:
+ StoreError if the session cannot be found.
+ """
+ # Add (or update) the results of the current stage to the database.
+ #
+ # Note that we need to allow for the same stage to complete multiple
+ # times here so that registration is idempotent.
+ try:
+ await self.db.simple_upsert(
+ table="ui_auth_sessions_credentials",
+ keyvalues={"session_id": session_id, "stage_type": stage_type},
+ values={"result": json.dumps(result)},
+ desc="mark_ui_auth_stage_complete",
+ )
+ except self.db.engine.module.IntegrityError:
+ raise StoreError(400, "Unknown session ID: %s" % (session_id,))
+
+ async def get_completed_ui_auth_stages(
+ self, session_id: str
+ ) -> Dict[str, Union[str, bool, JsonDict]]:
+ """
+ Retrieve the completed stages of a UI authentication session.
+
+ Args:
+ session_id: The ID of the session.
+ Returns:
+ The completed stages mapped to the result of the verification of
+ that auth-type.
+ """
+ results = {}
+ for row in await self.db.simple_select_list(
+ table="ui_auth_sessions_credentials",
+ keyvalues={"session_id": session_id},
+ retcols=("stage_type", "result"),
+ desc="get_completed_ui_auth_stages",
+ ):
+ results[row["stage_type"]] = json.loads(row["result"])
+
+ return results
+
+ async def set_ui_auth_clientdict(
+ self, session_id: str, clientdict: JsonDict
+ ) -> None:
+ """
+ Store an updated clientdict for a given session ID.
+
+ Args:
+ session_id: The ID of this session as returned from check_auth
+ clientdict:
+ The dictionary from the client root level, not the 'auth' key.
+ """
+ # The clientdict gets stored as JSON.
+ clientdict_json = json.dumps(clientdict)
+
+ self.db.simple_update_one(
+ table="ui_auth_sessions",
+ keyvalues={"session_id": session_id},
+ updatevalues={"clientdict": clientdict_json},
+ desc="set_ui_auth_client_dict",
+ )
+
+ async def set_ui_auth_session_data(self, session_id: str, key: str, value: Any):
+ """
+ Store a key-value pair into the sessions data associated with this
+ request. This data is stored server-side and cannot be modified by
+ the client.
+
+ Args:
+ session_id: The ID of this session as returned from check_auth
+ key: The key to store the data under
+ value: The data to store
+ Raises:
+ StoreError if the session cannot be found.
+ """
+ await self.db.runInteraction(
+ "set_ui_auth_session_data",
+ self._set_ui_auth_session_data_txn,
+ session_id,
+ key,
+ value,
+ )
+
+ def _set_ui_auth_session_data_txn(self, txn, session_id: str, key: str, value: Any):
+ # Get the current value.
+ result = self.db.simple_select_one_txn(
+ txn,
+ table="ui_auth_sessions",
+ keyvalues={"session_id": session_id},
+ retcols=("serverdict",),
+ )
+
+ # Update it and add it back to the database.
+ serverdict = json.loads(result["serverdict"])
+ serverdict[key] = value
+
+ self.db.simple_update_one_txn(
+ txn,
+ table="ui_auth_sessions",
+ keyvalues={"session_id": session_id},
+ updatevalues={"serverdict": json.dumps(serverdict)},
+ )
+
+ async def get_ui_auth_session_data(
+ self, session_id: str, key: str, default: Optional[Any] = None
+ ) -> Any:
+ """
+ Retrieve data stored with set_session_data
+
+ Args:
+ session_id: The ID of this session as returned from check_auth
+ key: The key to store the data under
+ default: Value to return if the key has not been set
+ Raises:
+ StoreError if the session cannot be found.
+ """
+ result = await self.db.simple_select_one(
+ table="ui_auth_sessions",
+ keyvalues={"session_id": session_id},
+ retcols=("serverdict",),
+ desc="get_ui_auth_session_data",
+ )
+
+ serverdict = json.loads(result["serverdict"])
+
+ return serverdict.get(key, default)
+
+
+class UIAuthStore(UIAuthWorkerStore):
+ def delete_old_ui_auth_sessions(self, expiration_time: int):
+ """
+ Remove sessions which were last used earlier than the expiration time.
+
+ Args:
+ expiration_time: The latest time that is still considered valid.
+ This is an epoch time in milliseconds.
+
+ """
+ return self.db.runInteraction(
+ "delete_old_ui_auth_sessions",
+ self._delete_old_ui_auth_sessions_txn,
+ expiration_time,
+ )
+
+ def _delete_old_ui_auth_sessions_txn(self, txn, expiration_time: int):
+ # Get the expired sessions.
+ sql = "SELECT session_id FROM ui_auth_sessions WHERE creation_time <= ?"
+ txn.execute(sql, [expiration_time])
+ session_ids = [r[0] for r in txn.fetchall()]
+
+ # Delete the corresponding completed credentials.
+ self.db.simple_delete_many_txn(
+ txn,
+ table="ui_auth_sessions_credentials",
+ column="session_id",
+ iterable=session_ids,
+ keyvalues={},
+ )
+
+ # Finally, delete the sessions.
+ self.db.simple_delete_many_txn(
+ txn,
+ table="ui_auth_sessions",
+ column="session_id",
+ iterable=session_ids,
+ keyvalues={},
+ )
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index e6159533..50f475bf 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -17,7 +17,17 @@
import logging
import time
from time import monotonic as monotonic_time
-from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Tuple,
+ TypeVar,
+)
from six import iteritems, iterkeys, itervalues
from six.moves import intern, range
@@ -32,6 +42,7 @@ from synapse.config.database import DatabaseConnectionConfig
from synapse.logging.context import (
LoggingContext,
LoggingContextOrSentinel,
+ current_context,
make_deferred_yieldable,
)
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -201,9 +212,9 @@ class LoggingTransaction:
def executemany(self, sql: str, *args: Any):
self._do_execute(self.txn.executemany, sql, *args)
- def _make_sql_one_line(self, sql):
+ def _make_sql_one_line(self, sql: str) -> str:
"Strip newlines out of SQL so that the loggers in the DB are on one line"
- return " ".join(l.strip() for l in sql.splitlines() if l.strip())
+ return " ".join(line.strip() for line in sql.splitlines() if line.strip())
def _do_execute(self, func, sql, *args):
sql = self._make_sql_one_line(sql)
@@ -483,7 +494,7 @@ class Database(object):
end = monotonic_time()
duration = end - start
- LoggingContext.current_context().add_database_transaction(duration)
+ current_context().add_database_transaction(duration)
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
@@ -510,7 +521,7 @@ class Database(object):
after_callbacks = [] # type: List[_CallbackListEntry]
exception_callbacks = [] # type: List[_CallbackListEntry]
- if LoggingContext.current_context() == LoggingContext.sentinel:
+ if not current_context():
logger.warning("Starting db txn '%s' from sentinel context", desc)
try:
@@ -547,10 +558,8 @@ class Database(object):
Returns:
Deferred: The result of func
"""
- parent_context = (
- LoggingContext.current_context()
- ) # type: Optional[LoggingContextOrSentinel]
- if parent_context == LoggingContext.sentinel:
+ parent_context = current_context() # type: Optional[LoggingContextOrSentinel]
+ if not parent_context:
logger.warning(
"Starting db connection from sentinel context: metrics will be lost"
)
@@ -1558,3 +1567,74 @@ def make_in_list_sql_clause(
return "%s = ANY(?)" % (column,), [list(iterable)]
else:
return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
+
+
+KV = TypeVar("KV")
+
+
+def make_tuple_comparison_clause(
+ database_engine: BaseDatabaseEngine, keys: List[Tuple[str, KV]]
+) -> Tuple[str, List[KV]]:
+ """Returns a tuple comparison SQL clause
+
+ Depending what the SQL engine supports, builds a SQL clause that looks like either
+ "(a, b) > (?, ?)", or "(a > ?) OR (a == ? AND b > ?)".
+
+ Args:
+ database_engine
+ keys: A set of (column, value) pairs to be compared.
+
+ Returns:
+ A tuple of SQL query and the args
+ """
+ if database_engine.supports_tuple_comparison:
+ return (
+ "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)),
+ [k[1] for k in keys],
+ )
+
+ # we want to build a clause
+ # (a > ?) OR
+ # (a == ? AND b > ?) OR
+ # (a == ? AND b == ? AND c > ?)
+ # ...
+ # (a == ? AND b == ? AND ... AND z > ?)
+ #
+ # or, equivalently:
+ #
+ # (a > ? OR (a == ? AND
+ # (b > ? OR (b == ? AND
+ # ...
+ # (y > ? OR (y == ? AND
+ # z > ?
+ # ))
+ # ...
+ # ))
+ # ))
+ #
+ # which itself is equivalent to (and apparently easier for the query optimiser):
+ #
+ # (a >= ? AND (a > ? OR
+ # (b >= ? AND (b > ? OR
+ # ...
+ # (y >= ? AND (y > ? OR
+ # z > ?
+ # ))
+ # ...
+ # ))
+ # ))
+ #
+ #
+
+ clause = ""
+ args = [] # type: List[KV]
+ for k, v in keys[:-1]:
+ clause = clause + "(%s >= ? AND (%s > ? OR " % (k, k)
+ args.extend([v, v])
+
+ (k, v) = keys[-1]
+ clause += "%s > ?" % (k,)
+ args.append(v)
+
+ clause += "))" * (len(keys) - 1)
+ return clause, args
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 3bc2e8b9..215a9494 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -85,6 +85,7 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
prepare_database(db_conn, self, config=None)
db_conn.create_function("rank", 1, _rank)
+ db_conn.execute("PRAGMA foreign_keys = ON;")
def is_deadlock(self, error):
return False
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6cb7d4b9..1712932f 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 57
+SCHEMA_VERSION = 58
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/schema/delta/58/00background_update_ordering.sql b/synapse/storage/schema/delta/58/00background_update_ordering.sql
new file mode 100644
index 00000000..02dae587
--- /dev/null
+++ b/synapse/storage/schema/delta/58/00background_update_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* add an "ordering" column to background_updates, which can be used to sort them
+ to achieve some level of consistency. */
+
+ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;