summaryrefslogtreecommitdiff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2022-01-18 15:27:03 +0100
committerAndrej Shadura <andrewsh@debian.org>2022-01-18 15:27:03 +0100
commit677db679d6e85151db3ffc494336450bc6e5cb70 (patch)
tree186be5a5a81961182d323509c8b66d80fa6de2e9 /synapse/replication/tcp
parent5e749433b9e317f6476f00da3ae4e2eaeb818f57 (diff)
New upstream version 1.50.0
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/streams/_base.py129
-rw-r--r--synapse/replication/tcp/streams/federation.py15
2 files changed, 74 insertions, 70 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 743a01da..5a2d90c5 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -15,7 +15,6 @@
import heapq
import logging
-from collections import namedtuple
from typing import (
TYPE_CHECKING,
Any,
@@ -30,6 +29,7 @@ from typing import (
import attr
from synapse.replication.http.streams import ReplicationGetStreamUpdates
+from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -226,17 +226,14 @@ class BackfillStream(Stream):
or it went from being an outlier to not.
"""
- BackfillStreamRow = namedtuple(
- "BackfillStreamRow",
- (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
- "relates_to", # str, optional
- ),
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class BackfillStreamRow:
+ event_id: str
+ room_id: str
+ type: str
+ state_key: Optional[str]
+ redacts: Optional[str]
+ relates_to: Optional[str]
NAME = "backfill"
ROW_TYPE = BackfillStreamRow
@@ -256,18 +253,15 @@ class BackfillStream(Stream):
class PresenceStream(Stream):
- PresenceStreamRow = namedtuple(
- "PresenceStreamRow",
- (
- "user_id", # str
- "state", # str
- "last_active_ts", # int
- "last_federation_update_ts", # int
- "last_user_sync_ts", # int
- "status_msg", # str
- "currently_active", # bool
- ),
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class PresenceStreamRow:
+ user_id: str
+ state: str
+ last_active_ts: int
+ last_federation_update_ts: int
+ last_user_sync_ts: int
+ status_msg: str
+ currently_active: bool
NAME = "presence"
ROW_TYPE = PresenceStreamRow
@@ -302,7 +296,7 @@ class PresenceFederationStream(Stream):
send.
"""
- @attr.s(slots=True, auto_attribs=True)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
class PresenceFederationStreamRow:
destination: str
user_id: str
@@ -320,9 +314,10 @@ class PresenceFederationStream(Stream):
class TypingStream(Stream):
- TypingStreamRow = namedtuple(
- "TypingStreamRow", ("room_id", "user_ids") # str # list(str)
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class TypingStreamRow:
+ room_id: str
+ user_ids: List[str]
NAME = "typing"
ROW_TYPE = TypingStreamRow
@@ -348,16 +343,13 @@ class TypingStream(Stream):
class ReceiptsStream(Stream):
- ReceiptsStreamRow = namedtuple(
- "ReceiptsStreamRow",
- (
- "room_id", # str
- "receipt_type", # str
- "user_id", # str
- "event_id", # str
- "data", # dict
- ),
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class ReceiptsStreamRow:
+ room_id: str
+ receipt_type: str
+ user_id: str
+ event_id: str
+ data: dict
NAME = "receipts"
ROW_TYPE = ReceiptsStreamRow
@@ -374,7 +366,9 @@ class ReceiptsStream(Stream):
class PushRulesStream(Stream):
"""A user has changed their push rules"""
- PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class PushRulesStreamRow:
+ user_id: str
NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow
@@ -396,10 +390,12 @@ class PushRulesStream(Stream):
class PushersStream(Stream):
"""A user has added/changed/removed a pusher"""
- PushersStreamRow = namedtuple(
- "PushersStreamRow",
- ("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class PushersStreamRow:
+ user_id: str
+ app_id: str
+ pushkey: str
+ deleted: bool
NAME = "pushers"
ROW_TYPE = PushersStreamRow
@@ -419,7 +415,7 @@ class CachesStream(Stream):
the cache on the workers
"""
- @attr.s(slots=True)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
@@ -430,9 +426,9 @@ class CachesStream(Stream):
invalidation_ts: Timestamp of when the invalidation took place.
"""
- cache_func = attr.ib(type=str)
- keys = attr.ib(type=Optional[List[Any]])
- invalidation_ts = attr.ib(type=int)
+ cache_func: str
+ keys: Optional[List[Any]]
+ invalidation_ts: int
NAME = "caches"
ROW_TYPE = CachesStreamRow
@@ -451,9 +447,9 @@ class DeviceListsStream(Stream):
told about a device update.
"""
- @attr.s(slots=True)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
class DeviceListsStreamRow:
- entity = attr.ib(type=str)
+ entity: str
NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow
@@ -470,7 +466,9 @@ class DeviceListsStream(Stream):
class ToDeviceStream(Stream):
"""New to_device messages for a client"""
- ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class ToDeviceStreamRow:
+ entity: str
NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow
@@ -487,9 +485,11 @@ class ToDeviceStream(Stream):
class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room"""
- TagAccountDataStreamRow = namedtuple(
- "TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class TagAccountDataStreamRow:
+ user_id: str
+ room_id: str
+ data: JsonDict
NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow
@@ -506,10 +506,11 @@ class TagAccountDataStream(Stream):
class AccountDataStream(Stream):
"""Global or per room account data was changed"""
- AccountDataStreamRow = namedtuple(
- "AccountDataStreamRow",
- ("user_id", "room_id", "data_type"), # str # Optional[str] # str
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class AccountDataStreamRow:
+ user_id: str
+ room_id: Optional[str]
+ data_type: str
NAME = "account_data"
ROW_TYPE = AccountDataStreamRow
@@ -573,10 +574,12 @@ class AccountDataStream(Stream):
class GroupServerStream(Stream):
- GroupsStreamRow = namedtuple(
- "GroupsStreamRow",
- ("group_id", "user_id", "type", "content"), # str # str # str # dict
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class GroupsStreamRow:
+ group_id: str
+ user_id: str
+ type: str
+ content: JsonDict
NAME = "groups"
ROW_TYPE = GroupsStreamRow
@@ -593,7 +596,9 @@ class GroupServerStream(Stream):
class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key"""
- UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class UserSignatureStreamRow:
+ user_id: str
NAME = "user_signature"
ROW_TYPE = UserSignatureStreamRow
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 0600cdbf..4046bdec 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -12,14 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from collections import namedtuple
from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple
+import attr
+
from synapse.replication.tcp.streams._base import (
Stream,
current_token_without_instance,
make_http_update_function,
)
+from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -30,13 +32,10 @@ class FederationStream(Stream):
sending disabled.
"""
- FederationStreamRow = namedtuple(
- "FederationStreamRow",
- (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
- ),
- )
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class FederationStreamRow:
+ type: str # the type of data as defined in the BaseFederationRows
+ data: JsonDict # serialization of a federation.send_queue.BaseFederationRow
NAME = "federation"
ROW_TYPE = FederationStreamRow