summaryrefslogtreecommitdiff
path: root/synapse/replication/tcp/redis.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/redis.py')
-rw-r--r--synapse/replication/tcp/redis.py39
1 files changed, 27 insertions, 12 deletions
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 989c5be0..fd1c0ec6 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -14,7 +14,7 @@
import logging
from inspect import isawaitable
-from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast
import attr
import txredisapi
@@ -85,14 +85,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
Attributes:
synapse_handler: The command handler to handle incoming commands.
- synapse_stream_name: The *redis* stream name to subscribe to and publish
+ synapse_stream_prefix: The *redis* stream name to subscribe to and publish
from (not anything to do with Synapse replication streams).
synapse_outbound_redis_connection: The connection to redis to use to send
commands.
"""
synapse_handler: "ReplicationCommandHandler"
- synapse_stream_name: str
+ synapse_stream_prefix: str
+ synapse_channel_names: List[str]
synapse_outbound_redis_connection: txredisapi.ConnectionHandler
def __init__(self, *args: Any, **kwargs: Any):
@@ -117,8 +118,13 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
# it's important to make sure that we only send the REPLICATE command once we
# have successfully subscribed to the stream - otherwise we might miss the
# POSITION response sent back by the other end.
- logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name)
- await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
+ fully_qualified_stream_names = [
+ f"{self.synapse_stream_prefix}/{stream_suffix}"
+ for stream_suffix in self.synapse_channel_names
+ ] + [self.synapse_stream_prefix]
+ logger.info("Sending redis SUBSCRIBE for %r", fully_qualified_stream_names)
+ await make_deferred_yieldable(self.subscribe(fully_qualified_stream_names))
+
logger.info(
"Successfully subscribed to redis stream, sending REPLICATE command"
)
@@ -215,10 +221,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
# remote instances.
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
+ channel_name = cmd.redis_channel_name(self.synapse_stream_prefix)
+
await make_deferred_yieldable(
- self.synapse_outbound_redis_connection.publish(
- self.synapse_stream_name, encoded_string
- )
+ self.synapse_outbound_redis_connection.publish(channel_name, encoded_string)
)
@@ -300,20 +306,27 @@ def format_address(address: IAddress) -> str:
class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
"""This is a reconnecting factory that connects to redis and immediately
- subscribes to a stream.
+ subscribes to some streams.
Args:
hs
outbound_redis_connection: A connection to redis that will be used to
send outbound commands (this is separate to the redis connection
used to subscribe).
+ channel_names: A list of channel names to append to the base channel name
+ to additionally subscribe to.
+ e.g. if ['ABC', 'DEF'] is specified then we'll listen to:
+ example.com; example.com/ABC; and example.com/DEF.
"""
maxDelay = 5
protocol = RedisSubscriber
def __init__(
- self, hs: "HomeServer", outbound_redis_connection: txredisapi.ConnectionHandler
+ self,
+ hs: "HomeServer",
+ outbound_redis_connection: txredisapi.ConnectionHandler,
+ channel_names: List[str],
):
super().__init__(
@@ -326,7 +339,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
)
self.synapse_handler = hs.get_replication_command_handler()
- self.synapse_stream_name = hs.hostname
+ self.synapse_stream_prefix = hs.hostname
+ self.synapse_channel_names = channel_names
self.synapse_outbound_redis_connection = outbound_redis_connection
@@ -340,7 +354,8 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
# protocol.
p.synapse_handler = self.synapse_handler
p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection
- p.synapse_stream_name = self.synapse_stream_name
+ p.synapse_stream_prefix = self.synapse_stream_prefix
+ p.synapse_channel_names = self.synapse_channel_names
return p