diff options
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/commands.py | 6 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 17 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/federation.py | 14 |
4 files changed, 25 insertions, 14 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index bb447f75..8abed1f5 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -312,16 +312,16 @@ class FederationAckCommand(Command): NAME = "FEDERATION_ACK" - def __init__(self, instance_name, token): + def __init__(self, instance_name: str, token: int): self.instance_name = instance_name self.token = token @classmethod - def from_line(cls, line): + def from_line(cls, line: str) -> "FederationAckCommand": instance_name, token = line.split(" ") return cls(instance_name, int(token)) - def to_line(self): + def to_line(self) -> str: return "%s %s" % (self.instance_name, self.token) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 825900f6..e829add2 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -104,7 +104,7 @@ tcp_outbound_commands_counter = Counter( # A list of all connected protocols. This allows us to send metrics about the # connections. -connected_connections = [] +connected_connections = [] # type: List[BaseReplicationStreamProtocol] logger = logging.getLogger(__name__) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index f45e7a8c..3dfee767 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -33,7 +33,7 @@ import attr from synapse.replication.http.streams import ReplicationGetStreamUpdates if TYPE_CHECKING: - import synapse.server + from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -299,20 +299,23 @@ class TypingStream(Stream): NAME = "typing" ROW_TYPE = TypingStreamRow - def __init__(self, hs): - typing_handler = hs.get_typing_handler() - + def __init__(self, hs: "HomeServer"): writer_instance = hs.config.worker.writers.typing if writer_instance == hs.get_instance_name(): # On the writer, query the typing handler - update_function = typing_handler.get_all_typing_updates + typing_writer_handler = hs.get_typing_writer_handler() + update_function = ( + typing_writer_handler.get_all_typing_updates + ) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]] + current_token_function = typing_writer_handler.get_current_token else: # Query the typing writer process update_function = make_http_update_function(hs, self.NAME) + current_token_function = hs.get_typing_handler().get_current_token super().__init__( hs.get_instance_name(), - current_token_without_instance(typing_handler.get_current_token), + current_token_without_instance(current_token_function), update_function, ) @@ -509,7 +512,7 @@ class AccountDataStream(Stream): NAME = "account_data" ROW_TYPE = AccountDataStreamRow - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() super().__init__( hs.get_instance_name(), diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index 9bcd13b0..9bb8e9e1 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from collections import namedtuple +from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Tuple from synapse.replication.tcp.streams._base import ( Stream, @@ -21,6 +22,9 @@ from synapse.replication.tcp.streams._base import ( make_http_update_function, ) +if TYPE_CHECKING: + from synapse.server import HomeServer + class FederationStream(Stream): """Data to be sent over federation. Only available when master has federation @@ -38,7 +42,7 @@ class FederationStream(Stream): NAME = "federation" ROW_TYPE = FederationStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): if hs.config.worker_app is None: # master process: get updates from the FederationRemoteSendQueue. # (if the master is configured to send federation itself, federation_sender @@ -48,7 +52,9 @@ class FederationStream(Stream): current_token = current_token_without_instance( federation_sender.get_current_token ) - update_function = federation_sender.get_replication_rows + update_function = ( + federation_sender.get_replication_rows + ) # type: Callable[[str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]] elif hs.should_send_federation(): # federation sender: Query master process @@ -69,5 +75,7 @@ class FederationStream(Stream): return 0 @staticmethod - async def _stub_update_function(instance_name, from_token, upto_token, limit): + async def _stub_update_function( + instance_name: str, from_token: int, upto_token: int, limit: int + ) -> Tuple[list, int, bool]: return [], upto_token, False |