From e08b937785731f74d536f82c5e015944d65d92fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Apr 2020 13:47:02 +0100 Subject: [PATCH 1/4] Fix replication metrics when using redis --- synapse/replication/tcp/protocol.py | 48 ++++++++--------------------- synapse/replication/tcp/redis.py | 14 ++++++++- 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 7240acb0a2d4..385ca73c6e1d 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -50,10 +50,7 @@ import fcntl import logging import struct -from collections import defaultdict -from typing import TYPE_CHECKING, DefaultDict, List - -from six import iteritems +from typing import TYPE_CHECKING, List from prometheus_client import Counter @@ -86,6 +83,14 @@ "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"] ) +tcp_inbound_commands = Counter( + "synapse_replication_tcp_protocol_inbound_commands", "", ["command", "name"], +) + +tcp_outbound_commands = Counter( + "synapse_replication_tcp_protocol_outbound_commands", "", ["command", "name"], +) + # A list of all connected protocols. This allows us to send metrics about the # connections. connected_connections = [] @@ -151,9 +156,6 @@ def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"): # The LoopingCall for sending pings. self._send_ping_loop = None - self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int] - self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int] - def connectionMade(self): logger.info("[%s] Connection established", self.id()) @@ -224,9 +226,7 @@ def lineReceived(self, line: bytes): self.last_received_command = self.clock.time_msec() - self.inbound_commands_counter[cmd.NAME] = ( - self.inbound_commands_counter[cmd.NAME] + 1 - ) + tcp_inbound_commands.labels(cmd.NAME, self.name).inc() # Now lets try and call on_ function run_as_background_process( @@ -292,9 +292,8 @@ def send_command(self, cmd, do_buffer=True): self._queue_command(cmd) return - self.outbound_commands_counter[cmd.NAME] = ( - self.outbound_commands_counter[cmd.NAME] + 1 - ) + tcp_outbound_commands.labels(cmd.NAME, self.name).inc() + string = "%s %s" % (cmd.NAME, cmd.to_line()) if "\n" in string: raise Exception("Unexpected newline in command: %r", string) @@ -546,26 +545,3 @@ def transport_kernel_read_buffer_size(protocol, read=True): for p in connected_connections }, ) - - -tcp_inbound_commands = LaterGauge( - "synapse_replication_tcp_protocol_inbound_commands", - "", - ["command", "name"], - lambda: { - (k, p.name): count - for p in connected_connections - for k, count in iteritems(p.inbound_commands_counter) - }, -) - -tcp_outbound_commands = LaterGauge( - "synapse_replication_tcp_protocol_outbound_commands", - "", - ["command", "name"], - lambda: { - (k, p.name): count - for p in connected_connections - for k, count in iteritems(p.outbound_commands_counter) - }, -) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 4c0842573550..2f5c3bc2d02b 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -25,7 +25,11 @@ ReplicateCommand, parse_command_from_line, ) -from synapse.replication.tcp.protocol import AbstractConnection +from synapse.replication.tcp.protocol import ( + AbstractConnection, + tcp_inbound_commands, + tcp_outbound_commands, +) if TYPE_CHECKING: from synapse.replication.tcp.handler import ReplicationCommandHandler @@ -79,6 +83,10 @@ def messageReceived(self, pattern: str, channel: str, message: str): ) return + # We use "redis" as the name here as we don't have 1:1 connections to + # remote instances. + tcp_inbound_commands.labels(cmd.NAME, "redis").inc() + # Now lets try and call on_ function run_as_background_process( "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd @@ -126,6 +134,10 @@ def send_command(self, cmd: Command): encoded_string = string.encode("utf-8") + # We use "redis" as the name here as we don't have 1:1 connections to + # remote instances. + tcp_outbound_commands.labels(cmd.NAME, "redis").inc() + async def _send(): with PreserveLoggingContext(): # Note that we use the other connection as we can't send From e78e38eeddb80bdc50c5f065512b18c3517cec90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Apr 2020 13:59:06 +0100 Subject: [PATCH 2/4] Newsfile --- changelog.d/7325.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7325.feature diff --git a/changelog.d/7325.feature b/changelog.d/7325.feature new file mode 100644 index 000000000000..ce6140fdd111 --- /dev/null +++ b/changelog.d/7325.feature @@ -0,0 +1 @@ +Add support for running replication over Redis when using workers. From 041e272e42faa6862504adeb81e0e33e06af6626 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Apr 2020 15:02:15 +0100 Subject: [PATCH 3/4] Change name --- synapse/replication/tcp/protocol.py | 16 ++++++++++------ synapse/replication/tcp/redis.py | 8 ++++---- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 385ca73c6e1d..a6ee02fb6f41 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -83,12 +83,16 @@ "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"] ) -tcp_inbound_commands = Counter( - "synapse_replication_tcp_protocol_inbound_commands", "", ["command", "name"], +tcp_inbound_commands_counter = Counter( + "synapse_replication_tcp_protocol_inbound_commands", + "Number of commands received from replication", + ["command", "name"], ) -tcp_outbound_commands = Counter( - "synapse_replication_tcp_protocol_outbound_commands", "", ["command", "name"], +tcp_outbound_commands_counter = Counter( + "synapse_replication_tcp_protocol_outbound_commands", + "Number of commands sent to replication", + ["command", "name"], ) # A list of all connected protocols. This allows us to send metrics about the @@ -226,7 +230,7 @@ def lineReceived(self, line: bytes): self.last_received_command = self.clock.time_msec() - tcp_inbound_commands.labels(cmd.NAME, self.name).inc() + tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc() # Now lets try and call on_ function run_as_background_process( @@ -292,7 +296,7 @@ def send_command(self, cmd, do_buffer=True): self._queue_command(cmd) return - tcp_outbound_commands.labels(cmd.NAME, self.name).inc() + tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc() string = "%s %s" % (cmd.NAME, cmd.to_line()) if "\n" in string: diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 2f5c3bc2d02b..49b3ed0c5e68 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -27,8 +27,8 @@ ) from synapse.replication.tcp.protocol import ( AbstractConnection, - tcp_inbound_commands, - tcp_outbound_commands, + tcp_inbound_commands_counter, + tcp_outbound_commands_counter, ) if TYPE_CHECKING: @@ -85,7 +85,7 @@ def messageReceived(self, pattern: str, channel: str, message: str): # We use "redis" as the name here as we don't have 1:1 connections to # remote instances. - tcp_inbound_commands.labels(cmd.NAME, "redis").inc() + tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc() # Now lets try and call on_ function run_as_background_process( @@ -136,7 +136,7 @@ def send_command(self, cmd: Command): # We use "redis" as the name here as we don't have 1:1 connections to # remote instances. - tcp_outbound_commands.labels(cmd.NAME, "redis").inc() + tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc() async def _send(): with PreserveLoggingContext(): From 9a2c2a6367d9fe79cf8c4263c6261fd4f2006bf1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Apr 2020 15:15:31 +0100 Subject: [PATCH 4/4] Update description --- synapse/replication/tcp/protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index a6ee02fb6f41..e3f64eba8f2b 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -85,13 +85,13 @@ tcp_inbound_commands_counter = Counter( "synapse_replication_tcp_protocol_inbound_commands", - "Number of commands received from replication", + "Number of commands received from replication, by command and name of process connected to", ["command", "name"], ) tcp_outbound_commands_counter = Counter( "synapse_replication_tcp_protocol_outbound_commands", - "Number of commands sent to replication", + "Number of commands sent to replication, by command and name of process connected to", ["command", "name"], )