summaryrefslogtreecommitdiff
path: root/synapse/replication
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2022-01-18 15:27:03 +0100
committerAndrej Shadura <andrewsh@debian.org>2022-01-18 15:27:03 +0100
commit677db679d6e85151db3ffc494336450bc6e5cb70 (patch)
tree186be5a5a81961182d323509c8b66d80fa6de2e9 /synapse/replication
parent5e749433b9e317f6476f00da3ae4e2eaeb818f57 (diff)
New upstream version 1.50.0
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/_base.py9
-rw-r--r--synapse/replication/slave/storage/client_ips.py9
-rw-r--r--synapse/replication/slave/storage/devices.py9
-rw-r--r--synapse/replication/slave/storage/events.py18
-rw-r--r--synapse/replication/slave/storage/filtering.py9
-rw-r--r--synapse/replication/slave/storage/groups.py9
-rw-r--r--synapse/replication/tcp/streams/_base.py129
-rw-r--r--synapse/replication/tcp/streams/federation.py15
8 files changed, 116 insertions, 91 deletions
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 7ecb446e..7644146d 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Optional
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
@@ -27,7 +27,12 @@ logger = logging.getLogger(__name__)
class BaseSlavedStore(CacheInvalidationWorkerStore):
- def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen: Optional[
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 61cd7e52..bc888ce1 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -14,7 +14,7 @@
from typing import TYPE_CHECKING
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
from synapse.util.caches.lrucache import LruCache
@@ -25,7 +25,12 @@ if TYPE_CHECKING:
class SlavedClientIpStore(BaseSlavedStore):
- def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
self.client_ip_last_seen: LruCache[tuple, int] = LruCache(
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 0a582960..a2aff75b 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.devices import DeviceWorkerStore
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -27,7 +27,12 @@ if TYPE_CHECKING:
class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore):
- def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
self.hs = hs
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 63ed50ca..0f083726 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
@@ -58,7 +58,12 @@ class SlavedEventStore(
RelationsWorkerStore,
BaseSlavedStore,
):
- def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
events_max = self._stream_id_gen.get_current_token()
@@ -75,12 +80,3 @@ class SlavedEventStore(
min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)
-
- # Cached functions can't be accessed through a class instance so we need
- # to reach inside the __dict__ to extract them.
-
- def get_room_max_stream_ordering(self):
- return self._stream_id_gen.get_current_token()
-
- def get_room_min_stream_ordering(self):
- return self._backfill_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py
index 90284c20..4d185e2b 100644
--- a/synapse/replication/slave/storage/filtering.py
+++ b/synapse/replication/slave/storage/filtering.py
@@ -14,7 +14,7 @@
from typing import TYPE_CHECKING
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.filtering import FilteringStore
from ._base import BaseSlavedStore
@@ -24,7 +24,12 @@ if TYPE_CHECKING:
class SlavedFilteringStore(BaseSlavedStore):
- def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
# Filters are immutable so this cache doesn't need to be expired
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 497e16c6..9d90e263 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import GroupServerStream
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.group_server import GroupServerWorkerStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -26,7 +26,12 @@ if TYPE_CHECKING:
class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
- def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
self.hs = hs
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 743a01da..5a2d90c5 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -15,7 +15,6 @@
import heapq
import logging
-from collections import namedtuple
from typing import (
TYPE_CHECKING,
Any,
@@ -30,6 +29,7 @@ from typing import (
import attr
from synapse.replication.http.streams import ReplicationGetStreamUpdates
+from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -226,17 +226,14 @@ class BackfillStream(Stream):
or it went from being an outlier to not.
"""
- BackfillStreamRow = namedtuple(
- "BackfillStreamRow",
- (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
- "relates_to", # str, optional
- ),
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class BackfillStreamRow:
+ event_id: str
+ room_id: str
+ type: str
+ state_key: Optional[str]
+ redacts: Optional[str]
+ relates_to: Optional[str]
NAME = "backfill"
ROW_TYPE = BackfillStreamRow
@@ -256,18 +253,15 @@ class BackfillStream(Stream):
class PresenceStream(Stream):
- PresenceStreamRow = namedtuple(
- "PresenceStreamRow",
- (
- "user_id", # str
- "state", # str
- "last_active_ts", # int
- "last_federation_update_ts", # int
- "last_user_sync_ts", # int
- "status_msg", # str
- "currently_active", # bool
- ),
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class PresenceStreamRow:
+ user_id: str
+ state: str
+ last_active_ts: int
+ last_federation_update_ts: int
+ last_user_sync_ts: int
+ status_msg: str
+ currently_active: bool
NAME = "presence"
ROW_TYPE = PresenceStreamRow
@@ -302,7 +296,7 @@ class PresenceFederationStream(Stream):
send.
"""
- @attr.s(slots=True, auto_attribs=True)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
class PresenceFederationStreamRow:
destination: str
user_id: str
@@ -320,9 +314,10 @@ class PresenceFederationStream(Stream):
class TypingStream(Stream):
- TypingStreamRow = namedtuple(
- "TypingStreamRow", ("room_id", "user_ids") # str # list(str)
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class TypingStreamRow:
+ room_id: str
+ user_ids: List[str]
NAME = "typing"
ROW_TYPE = TypingStreamRow
@@ -348,16 +343,13 @@ class TypingStream(Stream):
class ReceiptsStream(Stream):
- ReceiptsStreamRow = namedtuple(
- "ReceiptsStreamRow",
- (
- "room_id", # str
- "receipt_type", # str
- "user_id", # str
- "event_id", # str
- "data", # dict
- ),
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class ReceiptsStreamRow:
+ room_id: str
+ receipt_type: str
+ user_id: str
+ event_id: str
+ data: dict
NAME = "receipts"
ROW_TYPE = ReceiptsStreamRow
@@ -374,7 +366,9 @@ class ReceiptsStream(Stream):
class PushRulesStream(Stream):
"""A user has changed their push rules"""
- PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class PushRulesStreamRow:
+ user_id: str
NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow
@@ -396,10 +390,12 @@ class PushRulesStream(Stream):
class PushersStream(Stream):
"""A user has added/changed/removed a pusher"""
- PushersStreamRow = namedtuple(
- "PushersStreamRow",
- ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class PushersStreamRow:
+ user_id: str
+ app_id: str
+ pushkey: str
+ deleted: bool
NAME = "pushers"
ROW_TYPE = PushersStreamRow
@@ -419,7 +415,7 @@ class CachesStream(Stream):
the cache on the workers
"""
- @attr.s(slots=True)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
@@ -430,9 +426,9 @@ class CachesStream(Stream):
invalidation_ts: Timestamp of when the invalidation took place.
"""
- cache_func = attr.ib(type=str)
- keys = attr.ib(type=Optional[List[Any]])
- invalidation_ts = attr.ib(type=int)
+ cache_func: str
+ keys: Optional[List[Any]]
+ invalidation_ts: int
NAME = "caches"
ROW_TYPE = CachesStreamRow
@@ -451,9 +447,9 @@ class DeviceListsStream(Stream):
told about a device update.
"""
- @attr.s(slots=True)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
class DeviceListsStreamRow:
- entity = attr.ib(type=str)
+ entity: str
NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow
@@ -470,7 +466,9 @@ class DeviceListsStream(Stream):
class ToDeviceStream(Stream):
"""New to_device messages for a client"""
- ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class ToDeviceStreamRow:
+ entity: str
NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow
@@ -487,9 +485,11 @@ class ToDeviceStream(Stream):
class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room"""
- TagAccountDataStreamRow = namedtuple(
- "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class TagAccountDataStreamRow:
+ user_id: str
+ room_id: str
+ data: JsonDict
NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow
@@ -506,10 +506,11 @@ class TagAccountDataStream(Stream):
class AccountDataStream(Stream):
"""Global or per room account data was changed"""
- AccountDataStreamRow = namedtuple(
- "AccountDataStreamRow",
- ("user_id", "room_id", "data_type"), # str # Optional[str] # str
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class AccountDataStreamRow:
+ user_id: str
+ room_id: Optional[str]
+ data_type: str
NAME = "account_data"
ROW_TYPE = AccountDataStreamRow
@@ -573,10 +574,12 @@ class AccountDataStream(Stream):
class GroupServerStream(Stream):
- GroupsStreamRow = namedtuple(
- "GroupsStreamRow",
- ("group_id", "user_id", "type", "content"), # str # str # str # dict
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class GroupsStreamRow:
+ group_id: str
+ user_id: str
+ type: str
+ content: JsonDict
NAME = "groups"
ROW_TYPE = GroupsStreamRow
@@ -593,7 +596,9 @@ class GroupServerStream(Stream):
class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key"""
- UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class UserSignatureStreamRow:
+ user_id: str
NAME = "user_signature"
ROW_TYPE = UserSignatureStreamRow
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 0600cdbf..4046bdec 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -12,14 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import namedtuple
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
+import attr
+
from synapse.replication.tcp.streams._base import (
Stream,
current_token_without_instance,
make_http_update_function,
)
+from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -30,13 +32,10 @@ class FederationStream(Stream):
sending disabled.
"""
- FederationStreamRow = namedtuple(
- "FederationStreamRow",
- (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
- ),
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class FederationStreamRow:
+ type: str # the type of data as defined in the BaseFederationRows
+ data: JsonDict # serialization of a federation.send_queue.BaseFederationRow
NAME = "federation"
ROW_TYPE = FederationStreamRow