summaryrefslogtreecommitdiff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2019-05-22 22:41:35 +0100
committerAndrej Shadura <andrewsh@debian.org>2019-05-22 22:41:35 +0100
commita5a75eb24a1f5c0ea62cd1ea507863ae03f89c16 (patch)
tree2adb4044eb388d7b7e6f01aec57dfe61229542c8 /synapse/replication/tcp
parent483b4888dc553a576baf41dc18682a8e931abded (diff)
New upstream version 0.99.5.1
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py15
-rw-r--r--synapse/replication/tcp/protocol.py6
-rw-r--r--synapse/replication/tcp/resource.py3
-rw-r--r--synapse/replication/tcp/streams/__init__.py49
-rw-r--r--synapse/replication/tcp/streams/_base.py (renamed from synapse/replication/tcp/streams.py)128
-rw-r--r--synapse/replication/tcp/streams/events.py147
-rw-r--r--synapse/replication/tcp/streams/federation.py39
7 files changed, 279 insertions, 108 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e558f90e..206dc3b3 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -103,10 +103,19 @@ class ReplicationClientHandler(object):
hs.get_reactor().connectTCP(host, port, self.factory)
def on_rdata(self, stream_name, token, rows):
- """Called when we get new replication data. By default this just pokes
- the slave store.
+ """Called to handle a batch of replication data with a given stream token.
- Can be overriden in subclasses to handle more.
+ By default this just pokes the slave store. Can be overridden in subclasses to
+ handle more.
+
+ Args:
+ stream_name (str): name of the replication stream for this batch of rows
+ token (int): stream token for this batch of rows
+ rows (list): a list of Stream.ROW_TYPE objects as returned by
+ Stream.parse_row.
+
+ Returns:
+ Deferred|None
"""
logger.debug("Received rdata %s -> %s", stream_name, token)
return self.store.process_replication_rows(stream_name, token, rows)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 02e5bf6c..b51590cf 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire::
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
- > RDATA events 14 ["$149019767112vOHxz:localhost:8823",
- "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
+ > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823",
+ "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]]
< PING 1490197675618
> ERROR server stopping
* connection closed by server *
@@ -605,7 +605,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
inbound_rdata_count.labels(stream_name).inc()
try:
- row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
+ row = STREAMS_MAP[stream_name].parse_row(cmd.row)
except Exception:
logger.exception(
"[%s] Failed to parse RDATA: %r %r",
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 7fc346c7..f6a38f51 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -30,7 +30,8 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol
-from .streams import STREAMS_MAP, FederationStream
+from .streams import STREAMS_MAP
+from .streams.federation import FederationStream
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
new file mode 100644
index 00000000..634f636d
--- /dev/null
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -0,0 +1,49 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Defines all the valid streams that clients can subscribe to, and the format
+of the rows returned by each stream.
+
+Each stream is defined by the following information:
+
+ stream name: The name of the stream
+ row type: The type that is used to serialise/deserialse the row
+ current_token: The function that returns the current token for the stream
+ update_function: The function that returns a list of updates between two tokens
+"""
+
+from . import _base, events, federation
+
+STREAMS_MAP = {
+ stream.NAME: stream
+ for stream in (
+ events.EventsStream,
+ _base.BackfillStream,
+ _base.PresenceStream,
+ _base.TypingStream,
+ _base.ReceiptsStream,
+ _base.PushRulesStream,
+ _base.PushersStream,
+ _base.CachesStream,
+ _base.PublicRoomsStream,
+ _base.DeviceListsStream,
+ _base.ToDeviceStream,
+ federation.FederationStream,
+ _base.TagAccountDataStream,
+ _base.AccountDataStream,
+ _base.GroupServerStream,
+ )
+}
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams/_base.py
index e23084ba..b6ce7a7b 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,16 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Defines all the valid streams that clients can subscribe to, and the format
-of the rows returned by each stream.
-Each stream is defined by the following information:
-
- stream name: The name of the stream
- row type: The type that is used to serialise/deserialse the row
- current_token: The function that returns the current token for the stream
- update_function: The function that returns a list of updates between two tokens
-"""
import itertools
import logging
from collections import namedtuple
@@ -34,20 +26,13 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
-
-EventStreamRow = namedtuple("EventStreamRow", (
- "event_id", # str
- "room_id", # str
- "type", # str
- "state_key", # str, optional
- "redacts", # str, optional
-))
BackfillStreamRow = namedtuple("BackfillStreamRow", (
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
+ "relates_to", # str, optional
))
PresenceStreamRow = namedtuple("PresenceStreamRow", (
"user_id", # str
@@ -96,10 +81,6 @@ DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str
))
-FederationStreamRow = namedtuple("FederationStreamRow", (
- "type", # str, the type of data as defined in the BaseFederationRows
- "data", # dict, serialization of a federation.send_queue.BaseFederationRow
-))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str
"room_id", # str
@@ -111,12 +92,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
"data_type", # str
"data", # dict
))
-CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
- "room_id", # str
- "type", # str
- "state_key", # str
- "event_id", # str, optional
-))
GroupsStreamRow = namedtuple("GroupsStreamRow", (
"group_id", # str
"user_id", # str
@@ -132,9 +107,24 @@ class Stream(object):
time it was called up until the point `advance_current_token` was called.
"""
NAME = None # The name of the stream
- ROW_TYPE = None # The type of the row
+ ROW_TYPE = None # The type of the row. Used by the default impl of parse_row.
_LIMITED = True # Whether the update function takes a limit
+ @classmethod
+ def parse_row(cls, row):
+ """Parse a row received over replication
+
+ By default, assumes that the row data is an array object and passes its contents
+ to the constructor of the ROW_TYPE for this stream.
+
+ Args:
+ row: row data from the incoming RDATA command, after json decoding
+
+ Returns:
+ ROW_TYPE object for this stream
+ """
+ return cls.ROW_TYPE(*row)
+
def __init__(self, hs):
# The token from which we last asked for updates
self.last_token = self.current_token()
@@ -162,8 +152,10 @@ class Stream(object):
until the `upto_token`
Returns:
- (list(ROW_TYPE), int): list of updates plus the token used as an
- upper bound of the updates (i.e. the "current token")
+ Deferred[Tuple[List[Tuple[int, Any]], int]:
+ Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
+ list of ``(token, row)`` entries. ``row`` will be json-serialised and
+ sent over the replication steam.
"""
updates, current_token = yield self.get_updates_since(self.last_token)
self.last_token = current_token
@@ -176,8 +168,10 @@ class Stream(object):
stream updates
Returns:
- (list(ROW_TYPE), int): list of updates plus the token used as an
- upper bound of the updates (i.e. the "current token")
+ Deferred[Tuple[List[Tuple[int, Any]], int]:
+ Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
+ list of ``(token, row)`` entries. ``row`` will be json-serialised and
+ sent over the replication steam.
"""
if from_token in ("NOW", "now"):
defer.returnValue(([], self.upto_token))
@@ -202,7 +196,7 @@ class Stream(object):
from_token, current_token,
)
- updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
+ updates = [(row[0], row[1:]) for row in rows]
# check we didn't get more rows than the limit.
# doing it like this allows the update_function to be a generator.
@@ -232,20 +226,6 @@ class Stream(object):
raise NotImplementedError()
-class EventsStream(Stream):
- """We received a new event, or an event went from being an outlier to not
- """
- NAME = "events"
- ROW_TYPE = EventStreamRow
-
- def __init__(self, hs):
- store = hs.get_datastore()
- self.current_token = store.get_current_events_token
- self.update_function = store.get_all_new_forward_event_rows
-
- super(EventsStream, self).__init__(hs)
-
-
class BackfillStream(Stream):
"""We fetched some old events and either we had never seen that event before
or it went from being an outlier to not.
@@ -400,22 +380,6 @@ class ToDeviceStream(Stream):
super(ToDeviceStream, self).__init__(hs)
-class FederationStream(Stream):
- """Data to be sent over federation. Only available when master has federation
- sending disabled.
- """
- NAME = "federation"
- ROW_TYPE = FederationStreamRow
-
- def __init__(self, hs):
- federation_sender = hs.get_federation_sender()
-
- self.current_token = federation_sender.get_current_token
- self.update_function = federation_sender.get_replication_rows
-
- super(FederationStream, self).__init__(hs)
-
-
class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room
"""
@@ -459,21 +423,6 @@ class AccountDataStream(Stream):
defer.returnValue(results)
-class CurrentStateDeltaStream(Stream):
- """Current state for a room was changed
- """
- NAME = "current_state_deltas"
- ROW_TYPE = CurrentStateDeltaStreamRow
-
- def __init__(self, hs):
- store = hs.get_datastore()
-
- self.current_token = store.get_max_current_state_delta_stream_id
- self.update_function = store.get_all_updated_current_state_deltas
-
- super(CurrentStateDeltaStream, self).__init__(hs)
-
-
class GroupServerStream(Stream):
NAME = "groups"
ROW_TYPE = GroupsStreamRow
@@ -485,26 +434,3 @@ class GroupServerStream(Stream):
self.update_function = store.get_all_groups_changes
super(GroupServerStream, self).__init__(hs)
-
-
-STREAMS_MAP = {
- stream.NAME: stream
- for stream in (
- EventsStream,
- BackfillStream,
- PresenceStream,
- TypingStream,
- ReceiptsStream,
- PushRulesStream,
- PushersStream,
- CachesStream,
- PublicRoomsStream,
- DeviceListsStream,
- ToDeviceStream,
- FederationStream,
- TagAccountDataStream,
- AccountDataStream,
- CurrentStateDeltaStream,
- GroupServerStream,
- )
-}
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
new file mode 100644
index 00000000..f1290d02
--- /dev/null
+++ b/synapse/replication/tcp/streams/events.py
@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import heapq
+
+import attr
+
+from twisted.internet import defer
+
+from ._base import Stream
+
+
+"""Handling of the 'events' replication stream
+
+This stream contains rows of various types. Each row therefore contains a 'type'
+identifier before the real data. For example::
+
+ RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
+ RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
+
+An "ev" row is sent for each new event. The fields in the data part are:
+
+ * The new event id
+ * The room id for the event
+ * The type of the new event
+ * The state key of the event, for state events
+ * The event id of an event which is redacted by this event.
+
+A "state" row is sent whenever the "current state" in a room changes. The fields in the
+data part are:
+
+ * The room id for the state change
+ * The event type of the state which has changed
+ * The state_key of the state which has changed
+ * The event id of the new state
+
+"""
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamRow(object):
+ """A parsed row from the events replication stream"""
+ type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
+ data = attr.ib() # BaseEventsStreamRow
+
+
+class BaseEventsStreamRow(object):
+ """Base class for rows to be sent in the events stream.
+
+ Specifies how to identify, serialize and deserialize the different types.
+ """
+
+ TypeId = None # Unique string that ids the type. Must be overriden in sub classes.
+
+ @classmethod
+ def from_data(cls, data):
+ """Parse the data from the replication stream into a row.
+
+ By default we just call the constructor with the data list as arguments
+
+ Args:
+ data: The value of the data object from the replication stream
+ """
+ return cls(*data)
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamEventRow(BaseEventsStreamRow):
+ TypeId = "ev"
+
+ event_id = attr.ib() # str
+ room_id = attr.ib() # str
+ type = attr.ib() # str
+ state_key = attr.ib() # str, optional
+ redacts = attr.ib() # str, optional
+ relates_to = attr.ib() # str, optional
+
+
+@attr.s(slots=True, frozen=True)
+class EventsStreamCurrentStateRow(BaseEventsStreamRow):
+ TypeId = "state"
+
+ room_id = attr.ib() # str
+ type = attr.ib() # str
+ state_key = attr.ib() # str
+ event_id = attr.ib() # str, optional
+
+
+TypeToRow = {
+ Row.TypeId: Row
+ for Row in (
+ EventsStreamEventRow,
+ EventsStreamCurrentStateRow,
+ )
+}
+
+
+class EventsStream(Stream):
+ """We received a new event, or an event went from being an outlier to not
+ """
+ NAME = "events"
+
+ def __init__(self, hs):
+ self._store = hs.get_datastore()
+ self.current_token = self._store.get_current_events_token
+
+ super(EventsStream, self).__init__(hs)
+
+ @defer.inlineCallbacks
+ def update_function(self, from_token, current_token, limit=None):
+ event_rows = yield self._store.get_all_new_forward_event_rows(
+ from_token, current_token, limit,
+ )
+ event_updates = (
+ (row[0], EventsStreamEventRow.TypeId, row[1:])
+ for row in event_rows
+ )
+
+ state_rows = yield self._store.get_all_updated_current_state_deltas(
+ from_token, current_token, limit
+ )
+ state_updates = (
+ (row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
+ for row in state_rows
+ )
+
+ all_updates = heapq.merge(event_updates, state_updates)
+
+ defer.returnValue(all_updates)
+
+ @classmethod
+ def parse_row(cls, row):
+ (typ, data) = row
+ data = TypeToRow[typ].from_data(data)
+ return EventsStreamRow(typ, data)
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
new file mode 100644
index 00000000..9aa43aa8
--- /dev/null
+++ b/synapse/replication/tcp/streams/federation.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from collections import namedtuple
+
+from ._base import Stream
+
+FederationStreamRow = namedtuple("FederationStreamRow", (
+ "type", # str, the type of data as defined in the BaseFederationRows
+ "data", # dict, serialization of a federation.send_queue.BaseFederationRow
+))
+
+
+class FederationStream(Stream):
+ """Data to be sent over federation. Only available when master has federation
+ sending disabled.
+ """
+ NAME = "federation"
+ ROW_TYPE = FederationStreamRow
+
+ def __init__(self, hs):
+ federation_sender = hs.get_federation_sender()
+
+ self.current_token = federation_sender.get_current_token
+ self.update_function = federation_sender.get_replication_rows
+
+ super(FederationStream, self).__init__(hs)