summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2021-03-12 08:45:19 +0100
committerAndrej Shadura <andrewsh@debian.org>2021-03-12 08:45:19 +0100
commita164b24227153a3ffe4d9adbb9bb1c1e981efe39 (patch)
tree4b9353cc412efe3dc135c7dda83e0fb7af64680b /scripts
parent7b07dc8dd1aa7eb4c55edb19822a30cfdc4adc0b (diff)
New upstream version 1.29.0
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/synapse_port_db67
1 files changed, 43 insertions, 24 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 69bf9110..58edf6af 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -22,7 +22,7 @@ import logging
import sys
import time
import traceback
-from typing import Dict, Optional, Set
+from typing import Dict, Iterable, Optional, Set
import yaml
@@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_bg_updates import (
from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
+from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
find_max_generated_user_id_localpart,
@@ -177,6 +178,7 @@ class Store(
UserDirectoryBackgroundUpdateStore,
EndToEndKeyBackgroundStore,
StatsStore,
+ PusherWorkerStore,
):
def execute(self, f, *args, **kwargs):
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
@@ -629,7 +631,13 @@ class Porter(object):
await self._setup_state_group_id_seq()
await self._setup_user_id_seq()
await self._setup_events_stream_seqs()
- await self._setup_device_inbox_seq()
+ await self._setup_sequence(
+ "device_inbox_sequence", ("device_inbox", "device_federation_outbox")
+ )
+ await self._setup_sequence(
+ "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
+ await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
+ await self._setup_auth_chain_sequence()
# Step 3. Get tables.
self.progress.set_state("Fetching tables")
@@ -854,7 +862,7 @@ class Porter(object):
return done, remaining + done
- async def _setup_state_group_id_seq(self):
+ async def _setup_state_group_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)
@@ -868,7 +876,7 @@ class Porter(object):
await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
- async def _setup_user_id_seq(self):
+ async def _setup_user_id_seq(self) -> None:
curr_id = await self.sqlite_store.db_pool.runInteraction(
"setup_user_id_seq", find_max_generated_user_id_localpart
)
@@ -877,9 +885,9 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
- return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
+ await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
- async def _setup_events_stream_seqs(self):
+ async def _setup_events_stream_seqs(self) -> None:
"""Set the event stream sequences to the correct values.
"""
@@ -908,35 +916,46 @@ class Porter(object):
(curr_backward_id + 1,),
)
- return await self.postgres_store.db_pool.runInteraction(
+ await self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
)
- async def _setup_device_inbox_seq(self):
- """Set the device inbox sequence to the correct value.
+ async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None:
+ """Set a sequence to the correct value.
"""
- curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
- table="device_inbox",
- keyvalues={},
- retcol="COALESCE(MAX(stream_id), 1)",
- allow_none=True,
- )
+ current_stream_ids = []
+ for stream_id_table in stream_id_tables:
+ max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+ table=stream_id_table,
+ keyvalues={},
+ retcol="COALESCE(MAX(stream_id), 1)",
+ allow_none=True,
+ )
+ current_stream_ids.append(max_stream_id)
- curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
- table="device_federation_outbox",
- keyvalues={},
- retcol="COALESCE(MAX(stream_id), 1)",
- allow_none=True,
- )
+ next_id = max(current_stream_ids) + 1
+
+ def r(txn):
+ sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, )
+ txn.execute(sql + " %s", (next_id, ))
- next_id = max(curr_local_id, curr_federation_id) + 1
+ await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r)
+
+ async def _setup_auth_chain_sequence(self) -> None:
+ curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+ table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True
+ )
def r(txn):
txn.execute(
- "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,)
+ "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s",
+ (curr_chain_id,),
)
- return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r)
+ await self.postgres_store.db_pool.runInteraction(
+ "_setup_event_auth_chain_id", r,
+ )
+
##############################################